PubSubMessageJob.java 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package com.yihu.ehr.redis.pubsub;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.yihu.ehr.profile.queue.RedisCollection;
  4. import com.yihu.ehr.redis.pubsub.entity.RedisMqChannel;
  5. import com.yihu.ehr.redis.pubsub.entity.RedisMqMessageLog;
  6. import com.yihu.ehr.redis.pubsub.service.RedisMqChannelService;
  7. import com.yihu.ehr.redis.pubsub.service.RedisMqMessageLogService;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.scheduling.annotation.Scheduled;
  13. import org.springframework.stereotype.Component;
  14. import javax.annotation.Resource;
  15. import java.io.IOException;
  16. import java.util.Map;
  17. /**
  18. * 定时定量处理消息,包括:
  19. * - 发送消息。
  20. * - 重试订阅失败的消息。
  21. * - 处理订阅成功的消息。
  22. *
  23. * @author 张进军
  24. * @date 2018/5/7 14:01
  25. */
  26. @Component
  27. public class PubSubMessageJob {
  28. private Logger logger = LoggerFactory.getLogger(PubSubMessageJob.class);
  29. // 最大次数尝试重发订阅失败消息
  30. private final int maxFailedNum = 3;
  31. // 每次定时最多操作的消息数量
  32. private final int onceNum = 1000;
  33. @Autowired
  34. RedisMqChannelService redisMqChannelService;
  35. @Autowired
  36. RedisMqMessageLogService redisMqMessageLogService;
  37. @Resource
  38. RedisTemplate<String, Object> redisTemplate;
  39. @Autowired
  40. ObjectMapper objectMapper;
  41. /*
  42. * 发送消息
  43. */
  44. @Scheduled(initialDelay = 1000, fixedDelay = 5000)
  45. public void send() {
  46. try {
  47. int num = 0;
  48. while (true) {
  49. Object msgObj = redisTemplate.opsForList().rightPop(RedisCollection.PUB_WAITING_MESSAGES);
  50. if (msgObj == null) {
  51. break;
  52. }
  53. Map<String, Object> messageMap = objectMapper.readValue(String.valueOf(msgObj), Map.class);
  54. String channel = messageMap.get("channel").toString();
  55. // 发布消息
  56. redisTemplate.convertAndSend(channel, objectMapper.writeValueAsString(messageMap));
  57. // 累计入列数
  58. RedisMqChannel afterChannel = updateChannelQueueNumber(channel, "Enqueued");
  59. num++;
  60. if (num == onceNum) {
  61. break;
  62. }
  63. }
  64. } catch (IOException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. /*
  69. * 重试订阅失败的消息
  70. */
  71. @Scheduled(initialDelay = 1000, fixedDelay = 5000)
  72. public void resend() {
  73. try {
  74. int num = 0;
  75. while (true) {
  76. Object msgObj = redisTemplate.opsForList().rightPop(RedisCollection.SUB_FAILED_MESSAGES);
  77. if (msgObj == null) {
  78. break;
  79. }
  80. boolean valid = true;
  81. RedisMqMessageLog cacheMessageLog = objectMapper.readValue(String.valueOf(msgObj), RedisMqMessageLog.class);
  82. if (cacheMessageLog.getFailedNum() == 0) {
  83. // 头次订阅失败,则保存到数据库中
  84. cacheMessageLog.setFailedNum(1);
  85. redisMqMessageLogService.save(cacheMessageLog);
  86. } else if (cacheMessageLog.getFailedNum() == -1) {
  87. // 累计订阅失败次数
  88. RedisMqMessageLog dbMessageLog = redisMqMessageLogService.getById(cacheMessageLog.getId());
  89. if (dbMessageLog.getFailedNum() >= maxFailedNum) {
  90. // 大于等于最大尝试失败数,则不再重试。
  91. valid = false;
  92. } else {
  93. dbMessageLog.setFailedNum(dbMessageLog.getFailedNum() + 1);
  94. dbMessageLog.setErrorMsg(cacheMessageLog.getErrorMsg());
  95. redisMqMessageLogService.save(dbMessageLog);
  96. }
  97. }
  98. if (valid) {
  99. // 将消息加入到待发缓存集合中
  100. redisMqChannelService.addToPubWaitingMessage(cacheMessageLog.getPublisherAppId(),
  101. cacheMessageLog.getChannel(), cacheMessageLog.getMessage(), cacheMessageLog.getId());
  102. num++;
  103. if (num == onceNum) {
  104. break;
  105. }
  106. }
  107. }
  108. } catch (IOException e) {
  109. e.printStackTrace();
  110. }
  111. }
  112. /*
  113. * 处理订阅成功的消息。
  114. * 累计出列数、更新重试成功的订阅失败消息的状态。
  115. */
  116. @Scheduled(initialDelay = 1000, fixedDelay = 5000)
  117. public void update() {
  118. try {
  119. int num = 0;
  120. while (true) {
  121. Object msgObj = redisTemplate.opsForList().rightPop(RedisCollection.SUB_SUCCESSFUL_MESSAGES);
  122. if (msgObj == null) {
  123. break;
  124. }
  125. RedisMqMessageLog cacheMessageLog = objectMapper.readValue(String.valueOf(msgObj), RedisMqMessageLog.class);
  126. RedisMqMessageLog dbMessageLog = redisMqMessageLogService.getById(cacheMessageLog.getId());
  127. if (dbMessageLog != null && dbMessageLog.getStatus() == 0) {
  128. // 更新重试成功的订阅失败消息的状态
  129. redisMqMessageLogService.updateStatus(cacheMessageLog.getId(), 1);
  130. }
  131. // 累计出列数
  132. RedisMqChannel afterChannel = updateChannelQueueNumber(cacheMessageLog.getChannel(), "Dequeued");
  133. num++;
  134. if (num == onceNum) {
  135. break;
  136. }
  137. }
  138. } catch (IOException e) {
  139. e.printStackTrace();
  140. }
  141. }
  142. /**
  143. * 累计 channel 的出入列数
  144. *
  145. * @param channel 队列编码
  146. * @param type 出入列类型标识符
  147. * @return RedisMqChannel
  148. */
  149. private synchronized RedisMqChannel updateChannelQueueNumber(String channel, String type) {
  150. RedisMqChannel mqChannelAfter = null;
  151. RedisMqChannel mqChannel = redisMqChannelService.findByChannel(channel);
  152. if ("Dequeued".equals(type)) {
  153. // 累计出列数
  154. mqChannel.setDequeuedNum(mqChannel.getDequeuedNum() + 1);
  155. mqChannelAfter = redisMqChannelService.save(mqChannel);
  156. } else if ("Enqueued".equals(type)) {
  157. // 累计入列数
  158. mqChannel.setEnqueuedNum(mqChannel.getEnqueuedNum() + 1);
  159. mqChannelAfter = redisMqChannelService.save(mqChannel);
  160. }
  161. return mqChannelAfter;
  162. }
  163. }