redis 延時隊列

延時隊列,顧名思義是帶有延時功能的消息隊列,列舉幾個使用場景:

  1. 定時發公告
  2. 用戶下單30分鐘後未付款自動關閉訂單
  3. 用戶下單後延時短信提醒
  4. 延時關閉空閒客戶端連接
redis 延時隊列

使用redis提供的有序數據結構zset,把過期時間戳作為score。

redis 延時隊列

@Slf4j

@Service

public class RedisDelayQueue {

@Resource

private StringRedisTemplate stringRedisTemplate;

private ScheduledExecutorService timer = Executors.newScheduledThreadPool(16);

private List<string> consumeTopics = new ArrayList<>(64);/<string>

@PostConstruct

public void init() {

Set<string> topicList = stringRedisTemplate.opsForSet().members("topicList");/<string>

if (topicList != null) {

topicList.forEach(this::registTopic);

}

}

private void registTopic(String topic) {

log.info("註冊監聽topic消息:{}", topic);

timer.scheduleAtFixedRate(() -> {

Set<string> msgs = stringRedisTemplate.opsForZSet().rangeByScore(topic, 0, System.currentTimeMillis(), 0, 1000);/<string>

if (msgs != null && msgs.size() > 0) {

Long remove = stringRedisTemplate.opsForZSet().remove(topic, msgs.toArray());

//刪除結果大於0代表 搶到了

if( remove != null && remove> 0 ){

stringRedisTemplate.opsForList().leftPushAll(topic + "queue", msgs);

}

}

}, 1, 1, TimeUnit.SECONDS);

}

public void produce(String topic, String msg, Date date) {

log.info("topic:{} 生產消息:{},於{}消費", topic, msg, date);

Long addSuccess = stringRedisTemplate.opsForSet().add("topicList", topic);

if (addSuccess != null && addSuccess > 0) {

registTopic(topic);

}

stringRedisTemplate.opsForZSet().add(topic, msg, date.getTime());

}

public synchronized void consumer(String topic, Function<string> consumer) {/<string>

if (consumeTopics.contains(topic)) {

throw new RuntimeException("請勿重複監聽消費" + topic);

}

consumeTopics.add(topic);

int consumerPoolSize = 10;

ExecutorService consumerPool = Executors.newFixedThreadPool(consumerPoolSize);

for (int i = 0; i < consumerPoolSize; i++) {

consumerPool.submit(() -> {

do {

log.info("循環取消息:{}", topic);

String msg;

try {

msg = stringRedisTemplate.opsForList().rightPop(topic + "queue", 1000, TimeUnit.MINUTES);

} catch (QueryTimeoutException e) {

log.debug("監聽超時,重試中!");

continue;

}

log.info("{}監聽到消息:{}", topic, msg);

if (msg != null) {

Boolean consumerSuccess;

try {

consumerSuccess = consumer.apply(msg);

} catch (Exception e) {

log.warn("消費失敗!", e);

consumerSuccess = false;

}

//消費失敗,1分鐘後再重試

if (consumerSuccess == null || !consumerSuccess) {

log.info("消費失敗,重新放回隊列。msg:{},topic:{}", msg, topic);

produce(topic, msg, new Date(System.currentTimeMillis() + 60000));

}

}

} while (true);

});

}

}

}

單元測試

@Slf4j

@RunWith(SpringRunner.class)

@SpringBootTest

public class RedisDelayQueueTest {

@Resource

private RedisDelayQueue redisDelayQueue;

@Test

public void produce() {

for (int i = 0; i < 30; i++) {

redisDelayQueue.produce("topic"+i%3 , "hello message"+i , new Date(System.currentTimeMillis()+i*1000));

}

}

@Test

public void consumer() throws InterruptedException {

redisDelayQueue.consumer("topic0", (msg)->{

log.info("topic【{}】收到消息:{}","topic0",msg);

return true;

}); redisDelayQueue.consumer("topic1", (msg)->{

log.info("topic【{}】收到消息:{}","topic1",msg);

return true;

});

redisDelayQueue.consumer("topic2", (msg) -> {

log.info("topic【{}】收到消息:{}", "topic2", msg);

return true;

});

TimeUnit.MINUTES.sleep(10);

}

}


分享到:


相關文章: