🎯 一句话定位 :一个告警重推需求,三种方案对比,一次关于「只有最适合的架构,没有最好的架构」的实战思考。
💡 核心理念 :架构设计不是秀技术栈,而是在约束条件下找到最小复杂度的解决方案。
📖 3分钟速览版
📊 点击展开架构选型决策树
🔌 三方案对比
维度
代码内存队列
Redis ZSet
RabbitMQ 死信队列
实现复杂度
⭐ 低
⭐⭐ 中
⭐⭐⭐ 高
可靠性
❌ 进程重启丢失
✅ 持久化保障
✅ 消息确认机制
额外依赖
无
Redis(已有)
需新引入 MQ
集群支持
❌ 多节点重复推送
✅ 分布式锁
✅ 原生支持
适用规模
单机/开发环境
中小型生产环境
大型/高可靠场景
运维成本
无
低(复用已有 Redis)
高(MQ 集群运维)
🎯 决策结论 graph TD
A[告警重推需求] --> B{系统已有 Redis?}
B -->|是| C{并发量 > 10K/s?}
B -->|否| D{允许引入新中间件?}
C -->|是| E[RabbitMQ 死信队列]
C -->|否| F[Redis ZSet ✅ 推荐]
D -->|是| E
D -->|否| G[代码内存队列]
style F fill:#c8e6c9,stroke:#333,color:#000
style E fill:#bbdefb,stroke:#333,color:#000
style G fill:#fff9c4,stroke:#333,color:#000
**🖼️ 插图版(2026-04-17 增量补充)**
本文结论 :系统已有 Redis、并发量中等、不希望引入新中间件 → Redis ZSet 是最优解 。
🧠 深度剖析版 1. 业务背景 1.1 需求描述 告警系统的核心流程:
系统检测到异常,发出告警通知(短信/邮件/钉钉)
运维人员在告警面板上确认 告警
如果 N 分钟后仍未确认 ,告警自动重新推送
重推可能有多档:5 分钟、30 分钟、2 小时
超过最大重试次数后升级告警(如通知上级)
1.2 技术约束
当前技术栈 :Spring Boot + MySQL + Redis(Lettuce)
未引入 MQ :项目中没有 RabbitMQ/Kafka/RocketMQ
并发量 :中小型系统,告警量 < 1000 条/天,峰值 QPS < 100
可靠性要求 :告警不能丢失,但允许偶尔延迟(秒级精度即可)
1.3 核心问题
如何实现「到达指定时间后自动触发某个操作」?
这本质上是一个 延时任务调度 问题。
2. 方案一:代码内存队列 2.1 实现原理 使用 JDK 自带的 DelayQueue + 守护线程轮询。
2.2 代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class AlertTask implements Delayed { private final String alertId; private final long triggerTime; public AlertTask (String alertId, long delayMs) { this .alertId = alertId; this .triggerTime = System.currentTimeMillis() + delayMs; } @Override public long getDelay (TimeUnit unit) { long diff = triggerTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed other) { return Long.compare( this .triggerTime, ((AlertTask) other).triggerTime ); } public String getAlertId () { return alertId; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.util.concurrent.DelayQueue;@Component public class InMemoryAlertScheduler { private final DelayQueue<AlertTask> queue = new DelayQueue <>(); @PostConstruct public void startConsumer () { Thread consumer = new Thread (() -> { while (!Thread.currentThread().isInterrupted()) { try { AlertTask task = queue.take(); handleAlert(task.getAlertId()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } }, "alert-delay-consumer" ); consumer.setDaemon(true ); consumer.start(); } public void schedule (String alertId, long delayMs) { queue.put(new AlertTask (alertId, delayMs)); } private void handleAlert (String alertId) { } }
2.3 优缺点分析
优点
缺点
零外部依赖
进程重启后队列丢失
实现简单,10 分钟搞定
集群部署时多节点重复调度
精度高(毫秒级)
大量任务时内存压力大
适合开发/测试环境
无法跨 JVM 协调
结论 :适合原型验证和单机开发环境,不满足生产可靠性要求 。
3. 方案二:Redis ZSet 延时队列 3.1 核心原理 利用 ZSet 的 score 有序性 :
score = 任务触发时间戳
member = 任务 ID
定时轮询 ZRANGEBYSCORE,取出所有 score <= 当前时间 的任务
3.2 完整实现 任务入队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Service public class AlertQueueService { private static final String QUEUE_KEY = "alert:delay:queue" ; @Autowired private StringRedisTemplate redisTemplate; public void enqueue (String alertId, long delayMs) { double triggerTime = System.currentTimeMillis() + delayMs; redisTemplate.opsForZSet().add(QUEUE_KEY, alertId, triggerTime); } public void cancel (String alertId) { redisTemplate.opsForZSet().remove(QUEUE_KEY, alertId); } public Long pendingCount () { return redisTemplate.opsForZSet().zCard(QUEUE_KEY); } }
定时消费(Lua 原子脚本) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Component public class AlertScheduler { private static final String QUEUE_KEY = "alert:delay:queue" ; private static final String LOCK_KEY = "alert:scheduler:lock" ; @Autowired private StringRedisTemplate redisTemplate; @Autowired private AlertNotifyService notifyService; private static final String FETCH_AND_REMOVE_LUA = "local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1], 'LIMIT', 0, ARGV[2])\n" + "if #tasks > 0 then\n" + " redis.call('ZREM', KEYS[1], unpack(tasks))\n" + "end\n" + "return tasks" ; private final DefaultRedisScript<List> fetchScript; public AlertScheduler () { this .fetchScript = new DefaultRedisScript <>(); this .fetchScript.setScriptText(FETCH_AND_REMOVE_LUA); this .fetchScript.setResultType(List.class); } @Scheduled(fixedDelay = 10_000) public void pollExpiredAlerts () { Boolean locked = redisTemplate.opsForValue() .setIfAbsent(LOCK_KEY, "1" , Duration.ofSeconds(30 )); if (!Boolean.TRUE.equals(locked)) { return ; } try { String now = String.valueOf(System.currentTimeMillis()); @SuppressWarnings("unchecked") List<String> alertIds = redisTemplate.execute( fetchScript, List.of(QUEUE_KEY), now, "50" ); if (alertIds != null && !alertIds.isEmpty()) { for (String alertId : alertIds) { processAlert(alertId); } } } finally { redisTemplate.delete(LOCK_KEY); } } private void processAlert (String alertId) { try { if (!notifyService.isAcknowledged(alertId)) { notifyService.resend(alertId); } } catch (Exception e) { redisTemplate.opsForZSet().add( QUEUE_KEY, alertId, System.currentTimeMillis() + 60_000 ); } } }
启动类配置 1 2 3 4 5 6 7 @SpringBootApplication @EnableScheduling public class AlertApplication { public static void main (String[] args) { SpringApplication.run(AlertApplication.class, args); } }
3.3 可靠性分析
场景
表现
说明
进程重启
✅ 不丢失
任务在 Redis 中,重启后继续消费
多节点部署
✅ 不重复
Lua 脚本原子取出 + 分布式锁
Redis 宕机
⚠️ 取决于持久化
AOF everysec 最多丢 1 秒数据
大量任务堆积
✅ 可控
ZSet 内存占用小,百万级无压力
3.4 告警归并并发坑:先处理后删除为什么会误删新任务 上面的示例采用的是“原子取出并删除,再处理”的消费模型,适合告警重推。 但很多告警系统还有另一个需求:同一设备、同一规则、同一时间窗口内的 告警要先归并,满足条件后再生成一条聚合告警。
这时不少实现会变成:
REST 接口收到告警后,满足归并条件就直接归并处理
不满足条件就按 mergeKey 写入 ZSet,等待定时任务扫描
定时任务扫到到期 mergeKey 后,先执行归并处理
归并成功后再从 ZSet 删除这个 mergeKey
问题就在第 4 步:如果处理期间又有新告警进来,ZREM 删除的可能不是 旧任务,而是新告警刚写入的延迟任务。
sequenceDiagram
participant Job as 定时任务
participant Redis as Redis ZSet
participant Api as REST 接口
participant DB as 告警明细库
Job->>Redis: 读取到期 mergeKey=M, score=100
Job->>DB: 开始归并 M 的旧告警
Api->>DB: 写入 M 的新告警明细
Api->>Redis: ZADD M score=200
Job->>DB: 旧告警归并完成
Job->>Redis: ZREM M
Redis-->>Api: 新延迟任务被误删
这个竞态的根因是:ZSet 的 member 只有一个,同一个 mergeKey 再次ZADD 会覆盖 score;而普通 ZREM mergeKey 无法区分它删除的是“旧版本 任务”还是“新版本任务”。
3.5 四种修复方式对比
方案
核心思路
优点
风险
建议
先删再处理
扫描到期后先 ZREM,再执行业务归并
不会误删新任务
进程宕机可能丢任务
需配合 processing 队列
全链路加锁
REST 入队和定时归并都锁同一 mergeKey
语义最直观
接口耗时上升,锁过期难调
不作为首选
score/version 条件删除
删除前比较快照,只删旧版本任务
改动小,保留现有流程
归并逻辑仍需幂等
推荐
processing zset + lease
到期任务先搬到处理中队列,成功再 ack
可靠性最好
代码和监控都更复杂
后续演进
如果目标是“最小改动”,首选 score/version 条件删除。它不改变现有 REST 接口和定时任务的大结构,只是在队列任务上补一个版本栅栏。
3.6 最小改动方案:score/version 条件删除 设计里需要统一几个概念:
mergeKey:归并维度,例如设备 ID、告警类型、规则 ID 组合
QUEUE_KEY:Redis ZSet,存放待归并任务
VERSION_KEY:Redis Hash,存放每个 mergeKey 的版本号
safeAckLua:成功后条件删除脚本
safeRetryLua:失败后条件改 score 脚本
整体流程如下:
graph TD
A[新告警先落库] --> B[version +1 并 ZADD]
C[读取到期快照] --> D[归并处理 mergeKey]
D --> E{成功还是失败?}
E --> F[执行条件 ack/retry]
B -.并发更新.-> F
F --> G{快照仍有效?}
G -->|是| H[删除旧任务或改到重试时间]
G -->|否| I[保留新任务]
style B fill:#c8e6c9,stroke:#333,color:#000
style G fill:#fff3e0,stroke:#333,color:#000
style I fill:#bbdefb,stroke:#333,color:#000
入队:先落库,再原子更新版本和 score REST 接口不要只把数据放进 Redis。先把告警明细写入数据库或其他可靠 存储,再更新 Redis 队列。这样即使定时任务稍后处理,也能按 mergeKey 查到完整明细。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Service public class AlertMergeService { private static final String QUEUE_KEY = "alert:merge:delay:queue" ; private static final String VERSION_KEY = "alert:merge:version" ; private static final String ENQUEUE_LUA = """ local version = redis.call('HINCRBY', KEYS[2], ARGV[1], 1) redis.call('ZADD', KEYS[1], ARGV[2], ARGV[1]) return version """ ; private final StringRedisTemplate redisTemplate; private final AlertDetailMapper alertDetailMapper; private final AlertMergeMapper alertMergeMapper; public AlertMergeService ( StringRedisTemplate redisTemplate, AlertDetailMapper alertDetailMapper, AlertMergeMapper alertMergeMapper ) { this .redisTemplate = redisTemplate; this .alertDetailMapper = alertDetailMapper; this .alertMergeMapper = alertMergeMapper; } public long enqueue (AlertEvent event) { String mergeKey = buildMergeKey(event); long triggerTime = System.currentTimeMillis() + event.delayMs(); alertDetailMapper.insert(event.toEntity(mergeKey)); Long version = redisTemplate.execute( RedisScript.of(ENQUEUE_LUA, Long.class), List.of(QUEUE_KEY, VERSION_KEY), mergeKey, String.valueOf(triggerTime) ); return version == null ? 0L : version; } }
这里把 HINCRBY 和 ZADD 放进同一个 Lua 脚本,是为了避免 Redis 内部 出现“版本已增加但 score 还没更新”的中间态。
下面几段方法继续放在 AlertMergeService 中,为了便于讲解按职责拆开。
扫描:取到期任务时带上 score/version 快照 定时任务扫描时不要只拿 mergeKey,还要拿当时的 score 和 version。 这三个值组成快照,后面 ack 或 retry 都必须基于这份快照判断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public record MergeTaskSnapshot ( String mergeKey, double score, long version ) {} public List<MergeTaskSnapshot> fetchExpired (long now, int limit) { Set<ZSetOperations.TypedTuple<String>> tuples = redisTemplate .opsForZSet() .rangeByScoreWithScores(QUEUE_KEY, 0 , now, 0 , limit); if (tuples == null || tuples.isEmpty()) { return List.of(); } List<MergeTaskSnapshot> snapshots = new ArrayList <>(); for (ZSetOperations.TypedTuple<String> tuple : tuples) { String mergeKey = tuple.getValue(); Double score = tuple.getScore(); Object version = redisTemplate.opsForHash() .get(VERSION_KEY, mergeKey); if (mergeKey != null && score != null && version != null ) { snapshots.add(new MergeTaskSnapshot ( mergeKey, score, Long.parseLong(version.toString()) )); } } return snapshots; }
处理:归并必须按 mergeKey 幂等 版本栅栏只能保证“不误删新任务”,不能替代业务幂等。归并处理本身仍然 要按 mergeKey 做幂等保护,例如:
归并结果表对 mergeKey + windowStart 建唯一键
状态流转使用 CAS,例如 PENDING -> MERGED
批量写入使用 upsert,避免重复定时扫描造成重复结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void mergeBySnapshot (MergeTaskSnapshot snapshot) { List<AlertDetail> details = alertDetailMapper .selectPendingByMergeKey(snapshot.mergeKey()); if (details.isEmpty()) { return ; } AlertMergeResult result = AlertMergeResult.from( snapshot.mergeKey(), details ); int updated = alertMergeMapper.upsertByMergeKey(result); if (updated == 0 ) { return ; } alertDetailMapper.markMerged(snapshot.mergeKey()); }
成功:只删除仍然等于快照的任务 归并成功后不要直接 ZREM,而是执行 safeAckLua。只有当前 score 和 version 都还等于快照值,才说明没有新告警覆盖这个 mergeKey。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static final String SAFE_ACK_LUA = """ local currentScore = redis.call('ZSCORE', KEYS[1], ARGV[1]) if not currentScore then return 0 end local currentVersion = redis.call('HGET', KEYS[2], ARGV[1]) if tonumber(currentScore) == tonumber(ARGV[2]) and currentVersion == ARGV[3] then return redis.call('ZREM', KEYS[1], ARGV[1]) end return 0 """ ;public boolean safeAck (MergeTaskSnapshot snapshot) { Long removed = redisTemplate.execute( RedisScript.of(SAFE_ACK_LUA, Long.class), List.of(QUEUE_KEY, VERSION_KEY), snapshot.mergeKey(), String.valueOf(snapshot.score()), String.valueOf(snapshot.version()) ); return Long.valueOf(1L ).equals(removed); }
如果 safeAck 返回 false,通常不是错误,而是处理期间有新告警进来。 此时新任务仍留在 ZSet 中,下一轮扫描会继续处理。
失败:只重试仍然等于快照的任务 处理失败也不要无脑 ZADD。如果失败期间已经有新告警进入,新的 score 应该保留;如果还是旧版本,才把它改到重试时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private static final String SAFE_RETRY_LUA = """ local currentScore = redis.call('ZSCORE', KEYS[1], ARGV[1]) if not currentScore then return 0 end local currentVersion = redis.call('HGET', KEYS[2], ARGV[1]) if tonumber(currentScore) == tonumber(ARGV[2]) and currentVersion == ARGV[3] then redis.call('ZADD', KEYS[1], ARGV[4], ARGV[1]) return 1 end return 0 """ ;public boolean safeRetry (MergeTaskSnapshot snapshot, long nextRetryTime) { Long updated = redisTemplate.execute( RedisScript.of(SAFE_RETRY_LUA, Long.class), List.of(QUEUE_KEY, VERSION_KEY), snapshot.mergeKey(), String.valueOf(snapshot.score()), String.valueOf(snapshot.version()), String.valueOf(nextRetryTime) ); return Long.valueOf(1L ).equals(updated); }
最终定时任务只需要把处理结果映射到 safeAck 或 safeRetry:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Scheduled(fixedDelay = 10_000) public void pollExpiredMergeTasks () { long now = System.currentTimeMillis(); for (MergeTaskSnapshot snapshot : fetchExpired(now, 50 )) { try { mergeBySnapshot(snapshot); safeAck(snapshot); } catch (Exception ex) { long nextRetryTime = System.currentTimeMillis() + 60_000 ; safeRetry(snapshot, nextRetryTime); } } } private String buildMergeKey (AlertEvent event) { return event.deviceId() + ":" + event.ruleId() + ":" + event.level(); }
这套方案的关键不是“让处理过程完全串行”,而是让删除动作具备条件: 旧任务只能确认旧任务,不能碰新任务。
4. 方案三:RabbitMQ 死信队列 4.1 实现原理 利用 TTL(消息过期时间)+ Dead Letter Exchange(死信交换机):
告警消息发送到 delay-queue,设置 TTL = 延迟时间
消息过期后,自动路由到死信交换机
死信交换机转发到 alert-process-queue
消费者从 alert-process-queue 取出并处理
4.2 配置示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Configuration public class RabbitAlertConfig { @Bean public Queue delayQueue () { Map<String, Object> args = new HashMap <>(); args.put("x-dead-letter-exchange" , "alert.dlx" ); args.put("x-dead-letter-routing-key" , "alert.process" ); return new Queue ("alert.delay.queue" , true , false , false , args); } @Bean public DirectExchange deadLetterExchange () { return new DirectExchange ("alert.dlx" ); } @Bean public Queue processQueue () { return new Queue ("alert.process.queue" ); } @Bean public Binding dlxBinding () { return BindingBuilder .bind(processQueue()) .to(deadLetterExchange()) .with("alert.process" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Service public class RabbitAlertProducer { @Autowired private RabbitTemplate rabbitTemplate; public void scheduleAlert (String alertId, long delayMs) { rabbitTemplate.convertAndSend( "" , "alert.delay.queue" , alertId, message -> { message.getMessageProperties() .setExpiration(String.valueOf(delayMs)); return message; } ); } }
4.3 引入 MQ 的代价 这不只是加一个依赖的事:
代价项
具体影响
运维成本
RabbitMQ 集群搭建、监控、Erlang 升级
学习曲线
团队需要理解交换机/绑定/死信/确认机制
故障域扩大
MQ 宕机 = 所有依赖 MQ 的功能不可用
网络拓扑
新增端口开放、防火墙规则、VPC 配置
消息积压
需要处理消费者跟不上的场景
资源占用
Erlang VM 内存开销大,至少 512MB 起步
5. 架构选型决策 5.1 决策核心:从需求出发 不要问「哪个技术最先进」,要问「哪个方案在当前约束下最合适」。
当前约束条件 :
✅ 系统已有 Redis(Lettuce 客户端,配置完善)
❌ 未引入任何消息队列
📊 告警量 < 1000 条/天,QPS < 100
👥 团队 3-5 人,运维能力有限
⏱️ 时间精度要求秒级即可
5.2 决策矩阵
决策因素
权重
代码队列
ZSet
RabbitMQ
可靠性
30%
2
8
10
实现成本
25%
10
8
4
运维成本
20%
10
9
3
扩展性
15%
2
7
10
团队熟悉度
10%
10
8
3
加权得分
-
6.3
8.0
6.1
加权得分显示,Redis ZSet 综合得分最高(8.0),是当前场景的最优选。
5.3 架构哲学:只有最适合的,没有最好的 反面案例 :某创业团队 5 人,日活 1000,引入了 Kafka + Redis Cluster + ElasticSearch + K8s,运维占用了 40% 的开发时间,核心功能反而推进缓慢。
正面案例 :某中型公司告警系统,用 Redis ZSet 跑了 2 年,日均处理 5000+ 告警任务,zero downtime。当业务量增长到 10 万级时再平滑迁移到 RocketMQ。
核心原则 :
YAGNI (You Aren’t Gonna Need It) :不要为假想的未来需求引入复杂度
最小依赖原则 :每引入一个中间件,系统的故障域就增加一个
渐进式演进 :先用简单方案上线,有数据支撑后再升级
引入一个新中间件的成本 ≠ Maven 加一行依赖的成本。它等于:学习成本 + 运维成本 + 故障处理成本 + 团队认知负担,这些才是真正的技术债。
6. 生产注意事项 6.1 轮询间隔的选择
间隔
精度
CPU 开销
适用场景
1 秒
高
中
秒级精度要求
5 秒
中
低
一般告警场景
10 秒
一般
极低
分钟级告警(推荐)
30 秒
低
极低
非实时场景
6.2 ZSet Key 的管理 1 2 3 4 5 6 7 8 9 10 11 12 13 String queueKey = "alert:delay:queue:" + LocalDate.now().format( DateTimeFormatter.ofPattern("yyyyMMdd" ) ); @Scheduled(cron = "0 0 3 * * ?") public void cleanExpiredKeys () { String yesterday = LocalDate.now().minusDays(1 ).format( DateTimeFormatter.ofPattern("yyyyMMdd" ) ); redisTemplate.delete("alert:delay:queue:" + yesterday); }
6.3 Redis 宕机降级策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Scheduled(fixedDelay = 10_000) public void pollExpiredAlerts () { try { doPoll(); } catch (RedisConnectionFailureException e) { log.error("Redis 连接失败,启用降级策略" ); fallbackFromMySQL(); } } private void fallbackFromMySQL () { List<Alert> expired = alertMapper.selectUnacknowledgedBefore( LocalDateTime.now().minusMinutes(5 ) ); for (Alert alert : expired) { notifyService.resend(alert.getId()); } }
💬 常见问题(FAQ) Q1: ZSet 和 Redisson 的 RDelayedQueue 有什么区别? Redisson 的 RDelayedQueue 底层也是基于 ZSet 实现的,封装了轮询、原子消费等逻辑。如果项目已引入 Redisson,可以直接使用:
1 2 3 4 5 6 7 8 RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("alert-queue" ); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue); delayedQueue.offer("alert-123" , 5 , TimeUnit.MINUTES); String alertId = blockingQueue.take();
如果只用 Lettuce/Jedis,手动实现 ZSet 方案更轻量。
Q2: 如何保证 ZSet 消费的幂等性? 告警重推本身应该是幂等的(发送通知这个动作可以重复)。但如果涉及状态变更,使用以下策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void processAlert (String alertId) { Alert alert = alertMapper.selectById(alertId); if (alert == null || alert.isAcknowledged()) { return ; } int updated = alertMapper.updateStatusCAS( alertId, AlertStatus.PENDING, AlertStatus.RESENDING ); if (updated == 0 ) { return ; } notifyService.resend(alertId); }
Q3: 多节点部署时如何避免重复推送? 文章中的 Lua 脚本已经解决了重推场景的问题:ZRANGEBYSCORE +ZREM 在同一个 Lua 脚本中原子执行,只有一个节点能成功取出任务。 分布式锁是额外的保险层。
如果是告警归并场景,不能只看“重复推送”,还要看“新任务是否被旧任务 误删”。这时推荐使用 score/version 条件删除。
Q4: 如果 Redis 宕机,告警任务会丢失吗? 取决于 Redis 持久化配置:
配置
数据安全
性能影响
appendonly yes + appendfsync everysec
最多丢 1 秒
低
appendonly yes + appendfsync always
不丢数据
高
仅 RDB
可能丢几分钟
无
建议 :AOF everysec + 定期 RDB 快照。对于告警场景,1 秒的数据丢失可接受。
Q5: 什么情况下应该升级到 RabbitMQ? 当出现以下任一信号时,考虑升级:
告警量增长到 日均 10 万条以上
需要 多消费者组 (不同告警类型由不同团队处理)
需要 消息回溯/重放 (审计需求)
需要 跨系统事件驱动 (告警触发工单、告警触发自动扩容等)
Redis 内存压力大,ZSet 成为瓶颈
升级路径:ZSet 方案可以平滑过渡到 MQ——只需将入队逻辑从 ZADD 改为 send(),消费逻辑从轮询改为监听。
Q6: 为什么不直接先删再处理? 先删再处理可以避免旧任务误删新任务,但它会引入另一个问题:定时任务 删除成功后,如果 JVM 崩溃或业务归并超时,这个任务就没有地方恢复。
如果选择先删再处理,建议增加 processing zset:
到期任务从 QUEUE_KEY 搬到 PROCESSING_KEY
处理成功后从 PROCESSING_KEY ack
后台补偿任务扫描超时 lease,把任务搬回 QUEUE_KEY
这套方案可靠性更强,但已经不是“最小改动”。在已有 ZSet 方案上修补 并发删除问题,score/version 条件删除通常成本更低。
Q7: 只比较 score 够不够? 不够。score 是触发时间,理论上可能因为相同延迟、时间取整或重试策略而 重复。只比较 score 时,旧任务仍有机会误判为当前任务。
version 是单调递增的业务栅栏。每次 REST 接口为同一个 mergeKey 写入新告警时,都执行一次 HINCRBY。旧任务拿着旧 version 去 ack, 即使 score 巧合一致,也不会删除新任务。
Q8: 新告警和定时归并同时发生怎么办? 按推荐流程处理:
新告警先落库,再用 Lua 原子执行 version +1 和 ZADD
定时任务只拿 mergeKey + score + version 快照
归并逻辑按 mergeKey 做幂等
成功或失败后都用 Lua 比较快照,再决定 ack 或 retry
只要删除和重试动作都带条件,旧任务最多多处理一次,不会把新任务删掉。
✨ 总结 核心要点
延时任务不等于需要 MQ :Redis ZSet 在中小型场景下完全够用,可靠性有保障
引入新中间件的隐性成本远大于显性成本 :运维、学习、故障域扩大才是真正的代价
架构决策应基于约束条件而非技术偏好 :当前规模 + 现有技术栈 + 团队能力 = 最适合的方案
归并任务需要版本栅栏 :先处理后删除时,必须用 score/version 条件 ack 避免误删新任务
行动建议 今天就可以做的 :
评估当前系统是否有类似的延时任务需求(超时未支付取消、定时提醒等)
检查 Redis 是否已开启 AOF 持久化
排查归并类定时任务是否存在“先处理后 ZREM”的无条件删除
本周可以完成的 :
基于本文代码搭建一个 ZSet 延时队列的 demo
压测验证在目标并发量下的性能和可靠性
为告警归并任务补充 score/version 条件 ack 和条件 retry
长期持续改进的 :
监控 ZSet Key 的大小和消费延迟
制定从 ZSet 到 MQ 的升级触发条件和迁移方案
当可靠性要求继续提高时,演进到 processing zset + lease 模型
更新记录
版本
日期
说明
v1.0
2026-03-23
初始版本
v1.1
2026-04-17
为 1 个 Mermaid 图表追加 Chiikawa 风格插图(m2c-pipeline 生成)
v1.2
2026-04-27
补充告警归并并发删除问题与最小改动方案