HelloCoder HelloCoder
首页
《Java小白求职之路》
《小白学Java》
计算机毕设
  • 一些免费计算机资源
  • 脚手架工具
  • 《从0到1学习Java多线程》
  • 《从0到1搭建服务器》
  • 《可观测和监控》
随笔
关于作者
首页
《Java小白求职之路》
《小白学Java》
计算机毕设
  • 一些免费计算机资源
  • 脚手架工具
  • 《从0到1学习Java多线程》
  • 《从0到1搭建服务器》
  • 《可观测和监控》
随笔
关于作者
  • 《LearnJavaToFindAJob》

    • 导读

    • 【初级】6~12k档

    • 【中级】12k-26k档

    • 【高级】26k+档

      • 如何实现一个延时队列
      • 高级技术面试
    • 大厂面试题

    • 求职建议

    • 面经

  • LearnJavaToFindAJob
  • 【高级】26k+档
#【高级】26k+档
码农阿雨
2026-03-18
目录

如何实现一个延时队列

# 什么是延时队列?

延时队列是一种特殊的消息队列,它的核心特点是:消息在入队后不会立即被消费,而是需要等待指定的延迟时间后才能被消费。

简单来说:生产者提交带延迟时间的消息 → 队列暂存 → 时间到期 → 消费者才能拿到消息。

# 应用场景

  • 订单超时取消:用户下单后30分钟未支付自动取消
  • 定时任务调度:特定时间执行某个任务
  • 重试机制:失败后延迟一段时间重试
  • 缓存过期处理:缓存过期后延迟清理

# 1. 基于JDK的DelayQueue

适用场景:单机、轻量、无需引入中间件,最简单的实现方式。

# 核心依赖

  • java.util.concurrent.DelayQueue:JDK 自带的无界阻塞延时队列

  • java.util.concurrent.Delayed:延时任务必须实现的接口

public static void main(String[] args) {
        // 创建延时队列
        DelayQueue<DelayTask> delayQueue = new DelayQueue<>();

        // 生产者线程:添加延时任务
        new Thread(() -> {
            delayQueue.put(new DelayTask("订单001", 5));  // 5秒后执行
            delayQueue.put(new DelayTask("订单002", 2));  // 2秒后执行
            System.out.println("已提交2个延时任务");
        }).start();

        // 消费者线程:循环获取到期任务
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    // take() 会阻塞,直到有任务到期
                    DelayTask task = delayQueue.take();
                    System.out.println("执行任务:" + task.getTaskId() + ",执行时间:" + System.currentTimeMillis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        consumer.start();
    }


    static class DelayTask implements Delayed {
        private final String taskId;       // 任务ID
        private final long expireTime;     // 到期时间(毫秒时间戳)

        // 构造方法:传入延迟时间(秒)
        public DelayTask(String taskId, long delaySeconds) {
            this.taskId = taskId;
            this.expireTime = System.currentTimeMillis() + delaySeconds * 1000;
        }

        // 核心方法:返回剩余延迟时间(<=0 代表任务到期)
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        // 排序:按到期时间升序(队列优先弹出最早到期的任务)
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }

        public String getTaskId() {
            return taskId;
        }

        @Override
        public String toString() {
            return "DelayedTask{" +
                    "taskId='" + taskId + '\'' +
                    ", expireTime=" + expireTime +
                    '}';
        }
    }

输出:

已提交2个延时任务
执行任务:订单002,执行时间:1773847704804
执行任务:订单001,执行时间:1773847707807

# 2. 定时任务框架(ScheduledExecutorService)

适用场景:单机、简单的定时 / 延时任务,无需手动管理队列。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledDemo {
    public static void main(String[] args) {
        // 创建调度线程池
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        // 延时执行:延迟3秒,执行一次
        executor.schedule(() -> {
            System.out.println("延时3秒执行的任务");
        }, 3, TimeUnit.SECONDS);

        // 固定频率执行(了解)
        executor.scheduleAtFixedRate(()->{
            System.out.println("每隔2秒执行一次");
        },0,2,TimeUnit.SECONDS);
    }
}

# 3. Redis 实现(分布式首选)

适用场景:分布式系统、多服务共享延时任务,生产环境最常用。

# 实现原理

利用 Redis 的 ZSet(有序集合):

  • score = 消息到期时间戳
  • 消费者轮询 ZSet,取出 score ≤ 当前时间的消息

# 代码示例(基于 RedisTemplate)

// 生产者:添加延时消息
public void addDelayMessage(String key, String message, long delaySeconds) {
    long score = System.currentTimeMillis() + delaySeconds * 1000;
    redisTemplate.opsForZSet().add(key, message, score);
}

// 消费者:轮询获取到期消息
public void consumeDelayMessage(String key) {
    while (true) {
        long now = System.currentTimeMillis();
        // 取出 score <= now 的第一条消息
        Set<String> messages = redisTemplate.opsForZSet().rangeByScore(key, 0, now, 0, 1);
        if (!messages.isEmpty()) {
            String msg = messages.iterator().next();
            // 删除并消费
            redisTemplate.opsForZSet().remove(key, msg);
            System.out.println("消费消息:" + msg);
        }
    }
}

# 4. 消息中间件(RabbitMQ/RocketMQ)

适用场景:高并发、高可用、大型分布式项目(企业级标准方案)。

  • RabbitMQ:死信队列 + TTL 实现
  • RocketMQ:原生支持延时消息(固定 18 个等级)
  • Kafka:借助时间轮实现

以rocketmq为例:

public class RocketMQDelayProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        
        // 创建消息
        Message message = new Message("DelayTopic", "TagA", 
            "订单ID:123456".getBytes(RemotingHelper.DEFAULT_CHARSET));
        
        // 设置延迟级别 - 3表示延迟10秒
        // 1:1s, 2:5s, 3:10s, 4:30s, 5:1m, 6:2m ...
        message.setDelayTimeLevel(3);
        
        // 发送消息
        SendResult sendResult = producer.send(message);
        System.out.printf("发送结果:%s%n", sendResult);
        
        producer.shutdown();
    }
}
阅读全文
×

(为防止恶意爬虫)
扫码或搜索:HelloCoder
发送:290992
即可永久解锁本站全部文章

解锁
#【高级】26k+档
上次更新: 2026-03-18 15:32:38
最近更新
01
MySQL支持的锁有哪些
03-18
02
用户态和内核态的区别
03-18
03
Synchronized相关
03-18
更多文章>
Theme by Vdoing | Copyright © 2020-2026 码农阿雨
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式