延時隊列,顧名思義是帶有延時功能的消息隊列,列舉幾個使用場景:
- 定時發公告
- 用戶下單30分鐘後未付款自動關閉訂單
- 用戶下單後延時短信提醒
- 延時關閉空閒客戶端連接
使用redis提供的有序數據結構zset,把過期時間戳作為score。
@Slf4j
@Service
public class RedisDelayQueue {
@Resource
private StringRedisTemplate stringRedisTemplate;
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(16);
private List<string> consumeTopics = new ArrayList<>(64);/<string>
@PostConstruct
public void init() {
Set<string> topicList = stringRedisTemplate.opsForSet().members("topicList");/<string>
if (topicList != null) {
topicList.forEach(this::registTopic);
}
}
private void registTopic(String topic) {
log.info("註冊監聽topic消息:{}", topic);
timer.scheduleAtFixedRate(() -> {
Set<string> msgs = stringRedisTemplate.opsForZSet().rangeByScore(topic, 0, System.currentTimeMillis(), 0, 1000);/<string>
if (msgs != null && msgs.size() > 0) {
Long remove = stringRedisTemplate.opsForZSet().remove(topic, msgs.toArray());
//刪除結果大於0代表 搶到了
if( remove != null && remove> 0 ){
stringRedisTemplate.opsForList().leftPushAll(topic + "queue", msgs);
}
}
}, 1, 1, TimeUnit.SECONDS);
}
public void produce(String topic, String msg, Date date) {
log.info("topic:{} 生產消息:{},於{}消費", topic, msg, date);
Long addSuccess = stringRedisTemplate.opsForSet().add("topicList", topic);
if (addSuccess != null && addSuccess > 0) {
registTopic(topic);
}
stringRedisTemplate.opsForZSet().add(topic, msg, date.getTime());
}
public synchronized void consumer(String topic, Function<string> consumer) {/<string>
if (consumeTopics.contains(topic)) {
throw new RuntimeException("請勿重複監聽消費" + topic);
}
consumeTopics.add(topic);
int consumerPoolSize = 10;
ExecutorService consumerPool = Executors.newFixedThreadPool(consumerPoolSize);
for (int i = 0; i < consumerPoolSize; i++) {
consumerPool.submit(() -> {
do {
log.info("循環取消息:{}", topic);
String msg;
try {
msg = stringRedisTemplate.opsForList().rightPop(topic + "queue", 1000, TimeUnit.MINUTES);
} catch (QueryTimeoutException e) {
log.debug("監聽超時,重試中!");
continue;
}
log.info("{}監聽到消息:{}", topic, msg);
if (msg != null) {
Boolean consumerSuccess;
try {
consumerSuccess = consumer.apply(msg);
} catch (Exception e) {
log.warn("消費失敗!", e);
consumerSuccess = false;
}
//消費失敗,1分鐘後再重試
if (consumerSuccess == null || !consumerSuccess) {
log.info("消費失敗,重新放回隊列。msg:{},topic:{}", msg, topic);
produce(topic, msg, new Date(System.currentTimeMillis() + 60000));
}
}
} while (true);
});
}
}
}
單元測試
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisDelayQueueTest {
@Resource
private RedisDelayQueue redisDelayQueue;
@Test
public void produce() {
for (int i = 0; i < 30; i++) {
redisDelayQueue.produce("topic"+i%3 , "hello message"+i , new Date(System.currentTimeMillis()+i*1000));
}
}
@Test
public void consumer() throws InterruptedException {
redisDelayQueue.consumer("topic0", (msg)->{
log.info("topic【{}】收到消息:{}","topic0",msg);
return true;
}); redisDelayQueue.consumer("topic1", (msg)->{
log.info("topic【{}】收到消息:{}","topic1",msg);
return true;
});
redisDelayQueue.consumer("topic2", (msg) -> {
log.info("topic【{}】收到消息:{}", "topic2", msg);
return true;
});
TimeUnit.MINUTES.sleep(10);
}
}
閱讀更多 光谷程序員 的文章