如何实现一个延时队列
# 什么是延时队列?
延时队列是一种特殊的消息队列,它的核心特点是:消息在入队后不会立即被消费,而是需要等待指定的延迟时间后才能被消费。
简单来说:生产者提交带延迟时间的消息 → 队列暂存 → 时间到期 → 消费者才能拿到消息。
# 应用场景
- 订单超时取消:用户下单后30分钟未支付自动取消
- 定时任务调度:特定时间执行某个任务
- 重试机制:失败后延迟一段时间重试
- 缓存过期处理:缓存过期后延迟清理
# 1. 基于JDK的DelayQueue
适用场景:单机、轻量、无需引入中间件,最简单的实现方式。
# 核心依赖
java.util.concurrent.DelayQueue:JDK 自带的无界阻塞延时队列java.util.concurrent.Delayed:延时任务必须实现的接口
public static void main(String[] args) {
// 创建延时队列
DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
// 生产者线程:添加延时任务
new Thread(() -> {
delayQueue.put(new DelayTask("订单001", 5)); // 5秒后执行
delayQueue.put(new DelayTask("订单002", 2)); // 2秒后执行
System.out.println("已提交2个延时任务");
}).start();
// 消费者线程:循环获取到期任务
Thread consumer = new Thread(() -> {
while (true) {
try {
// take() 会阻塞,直到有任务到期
DelayTask task = delayQueue.take();
System.out.println("执行任务:" + task.getTaskId() + ",执行时间:" + System.currentTimeMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
consumer.start();
}
static class DelayTask implements Delayed {
private final String taskId; // 任务ID
private final long expireTime; // 到期时间(毫秒时间戳)
// 构造方法:传入延迟时间(秒)
public DelayTask(String taskId, long delaySeconds) {
this.taskId = taskId;
this.expireTime = System.currentTimeMillis() + delaySeconds * 1000;
}
// 核心方法:返回剩余延迟时间(<=0 代表任务到期)
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 排序:按到期时间升序(队列优先弹出最早到期的任务)
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
public String getTaskId() {
return taskId;
}
@Override
public String toString() {
return "DelayedTask{" +
"taskId='" + taskId + '\'' +
", expireTime=" + expireTime +
'}';
}
}
输出:
已提交2个延时任务
执行任务:订单002,执行时间:1773847704804
执行任务:订单001,执行时间:1773847707807
# 2. 定时任务框架(ScheduledExecutorService)
适用场景:单机、简单的定时 / 延时任务,无需手动管理队列。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledDemo {
public static void main(String[] args) {
// 创建调度线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 延时执行:延迟3秒,执行一次
executor.schedule(() -> {
System.out.println("延时3秒执行的任务");
}, 3, TimeUnit.SECONDS);
// 固定频率执行(了解)
executor.scheduleAtFixedRate(()->{
System.out.println("每隔2秒执行一次");
},0,2,TimeUnit.SECONDS);
}
}
# 3. Redis 实现(分布式首选)
适用场景:分布式系统、多服务共享延时任务,生产环境最常用。
# 实现原理
利用 Redis 的 ZSet(有序集合):
- score = 消息到期时间戳
- 消费者轮询 ZSet,取出 score ≤ 当前时间的消息
# 代码示例(基于 RedisTemplate)
// 生产者:添加延时消息
public void addDelayMessage(String key, String message, long delaySeconds) {
long score = System.currentTimeMillis() + delaySeconds * 1000;
redisTemplate.opsForZSet().add(key, message, score);
}
// 消费者:轮询获取到期消息
public void consumeDelayMessage(String key) {
while (true) {
long now = System.currentTimeMillis();
// 取出 score <= now 的第一条消息
Set<String> messages = redisTemplate.opsForZSet().rangeByScore(key, 0, now, 0, 1);
if (!messages.isEmpty()) {
String msg = messages.iterator().next();
// 删除并消费
redisTemplate.opsForZSet().remove(key, msg);
System.out.println("消费消息:" + msg);
}
}
}
# 4. 消息中间件(RabbitMQ/RocketMQ)
适用场景:高并发、高可用、大型分布式项目(企业级标准方案)。
- RabbitMQ:死信队列 + TTL 实现
- RocketMQ:原生支持延时消息(固定 18 个等级)
- Kafka:借助时间轮实现
以rocketmq为例:
public class RocketMQDelayProducer {
public static void main(String[] args) throws Exception {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 创建消息
Message message = new Message("DelayTopic", "TagA",
"订单ID:123456".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延迟级别 - 3表示延迟10秒
// 1:1s, 2:5s, 3:10s, 4:30s, 5:1m, 6:2m ...
message.setDelayTimeLevel(3);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.printf("发送结果:%s%n", sendResult);
producer.shutdown();
}
}
上次更新: 2026-03-18 15:32:38