ActiveMQHelper.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package com.yihu.base.activemq;
  2. import org.apache.activemq.ActiveMQSession;
  3. import org.apache.activemq.command.ActiveMQQueue;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.jms.connection.CachingConnectionFactory;
  7. import org.springframework.jms.core.JmsTemplate;
  8. import org.springframework.jms.listener.DefaultMessageListenerContainer;
  9. import javax.jms.MessageListener;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. import java.util.concurrent.ArrayBlockingQueue;
  13. import java.util.concurrent.BlockingQueue;
  14. /**
  15. * Created by chenweida on 2018/2/11.
  16. */
  17. public class ActiveMQHelper {
  18. private Logger logger = LoggerFactory.getLogger(ActiveMQHelper.class);
  19. private static Map<String, BlockingQueue<DefaultMessageListenerContainer>> holder = new HashMap<String, BlockingQueue<DefaultMessageListenerContainer>>();
  20. private CachingConnectionFactory cachingConnectionFactory;
  21. private JmsTemplate jmsTemplate;
  22. /**
  23. * 往消息队列发消息
  24. *
  25. * @param queueName
  26. * @param message
  27. * @return
  28. * @throws Exception
  29. */
  30. public void send(String queueName, Object message) {
  31. jmsTemplate.convertAndSend(queueName, message);
  32. }
  33. /**
  34. * 动态新增一个监听
  35. *
  36. * @param queueName
  37. * @param messageListener
  38. * @return
  39. * @throws Exception
  40. */
  41. public synchronized Boolean addListener(String queueName, MessageListener messageListener) {
  42. try {
  43. startReceiverByQueueName(messageListener, queueName);
  44. return true;
  45. } catch (Exception e) {
  46. logger.error("新增监听失败:" + e.getMessage());
  47. return false;
  48. }
  49. }
  50. /**
  51. * 动态关闭监听
  52. *
  53. * @param queueName
  54. * @return
  55. */
  56. public synchronized Boolean closeListener(
  57. String queueName) {
  58. try {
  59. while (true) {
  60. BlockingQueue<DefaultMessageListenerContainer> defaultMessageListenerContainers = holder.get(queueName);
  61. if (defaultMessageListenerContainers == null || defaultMessageListenerContainers.size() == 0) {
  62. logger.error("关闭失败:消费者不存在或者已经关闭");
  63. return false;
  64. }
  65. //每次关闭队列头 先进先出
  66. DefaultMessageListenerContainer defaultMessageListenerContainer = defaultMessageListenerContainers.poll();
  67. defaultMessageListenerContainer.shutdown();
  68. if (defaultMessageListenerContainer.isActive() == false) {
  69. //如果队列长度是0 那么移除map
  70. if (defaultMessageListenerContainers.size() == 0) {
  71. holder.remove(queueName);
  72. }
  73. break;
  74. }
  75. }
  76. return true;
  77. } catch (Exception e) {
  78. logger.error("新增监听失败:" + e.getMessage());
  79. return false;
  80. }
  81. }
  82. private void startReceiverByQueueName(MessageListener receiver, String queueName) throws InterruptedException {
  83. ActiveMQQueue destination = new ActiveMQQueue(queueName);
  84. DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
  85. // 监听容器属性的配置
  86. listenerContainer.setConnectionFactory(cachingConnectionFactory);
  87. // 设置目的地
  88. listenerContainer.setDestination(destination);
  89. // 设置监听器
  90. listenerContainer.setMessageListener(receiver);
  91. // 设置消费者集群数
  92. int consumers = Integer.valueOf(2);
  93. listenerContainer.setConcurrentConsumers(consumers);
  94. // 设置监听队列还是主题 默认是队列
  95. listenerContainer.setPubSubNoLocal(false);
  96. // 设置应答模式 默认是4
  97. listenerContainer.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
  98. // 设置是否启动事物 默认不开启
  99. listenerContainer.setSessionTransacted(false);
  100. // 将监听容器保存在holder中
  101. BlockingQueue basket = holder.get(queueName);
  102. if (basket == null) {
  103. basket = new ArrayBlockingQueue(5000);
  104. basket.put(listenerContainer);
  105. }
  106. holder.put(queueName, basket);
  107. listenerContainer.initialize();
  108. // 启动监听
  109. listenerContainer.start();
  110. }
  111. public CachingConnectionFactory getCachingConnectionFactory() {
  112. return cachingConnectionFactory;
  113. }
  114. public void setCachingConnectionFactory(CachingConnectionFactory cachingConnectionFactory) {
  115. this.cachingConnectionFactory = cachingConnectionFactory;
  116. }
  117. public JmsTemplate getJmsTemplate() {
  118. return jmsTemplate;
  119. }
  120. public void setJmsTemplate(JmsTemplate jmsTemplate) {
  121. this.jmsTemplate = jmsTemplate;
  122. }
  123. }