Browse Source

首次提交

chenweida 7 years ago
parent
commit
b38b8b45ff

+ 143 - 0
common-activemq-starter/src/main/java/com/yihu/base/activemq/ActiveMQHelper.java

@ -0,0 +1,143 @@
package com.yihu.base.activemq;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.jms.MessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * Created by chenweida on 2018/2/11.
 */
public class ActiveMQHelper {
    private Logger logger = LoggerFactory.getLogger(ActiveMQHelper.class);
    private static Map<String, BlockingQueue<DefaultMessageListenerContainer>> holder = new HashMap<String, BlockingQueue<DefaultMessageListenerContainer>>();
    private CachingConnectionFactory cachingConnectionFactory;
    private JmsTemplate jmsTemplate;
    /**
     * 往消息队列发消息
     *
     * @param queueName
     * @param message
     * @return
     * @throws Exception
     */
    public void send(String queueName, Object message) {
        jmsTemplate.convertAndSend(queueName, message);
    }
    /**
     * 动态新增一个监听
     *
     * @param queueName
     * @param messageListener
     * @return
     * @throws Exception
     */
    public synchronized Boolean addListener(String queueName, MessageListener messageListener) {
        try {
            startReceiverByQueueName(messageListener, queueName);
            return true;
        } catch (Exception e) {
            logger.error("新增监听失败:" + e.getMessage());
            return false;
        }
    }
    /**
     * 动态关闭监听
     *
     * @param queueName
     * @return
     */
    public synchronized Boolean closeListener(
            String queueName) {
        try {
            while (true) {
                BlockingQueue<DefaultMessageListenerContainer> defaultMessageListenerContainers = holder.get(queueName);
                if (defaultMessageListenerContainers == null || defaultMessageListenerContainers.size() == 0) {
                    logger.error("关闭失败:消费者不存在或者已经关闭");
                    return false;
                }
                //每次关闭队列头 先进先出
                DefaultMessageListenerContainer defaultMessageListenerContainer = defaultMessageListenerContainers.poll();
                defaultMessageListenerContainer.shutdown();
                if (defaultMessageListenerContainer.isActive() == false) {
                    //如果队列长度是0 那么移除map
                    if (defaultMessageListenerContainers.size() == 0) {
                        holder.remove(queueName);
                    }
                    break;
                }
            }
            return true;
        } catch (Exception e) {
            logger.error("新增监听失败:" + e.getMessage());
            return false;
        }
    }
    private void startReceiverByQueueName(MessageListener receiver, String queueName) throws InterruptedException {
        ActiveMQQueue destination = new ActiveMQQueue(queueName);
        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
        // 监听容器属性的配置
        listenerContainer.setConnectionFactory(cachingConnectionFactory);
        // 设置目的地
        listenerContainer.setDestination(destination);
        // 设置监听器
        listenerContainer.setMessageListener(receiver);
        // 设置消费者集群数
        int consumers = Integer.valueOf(2);
        listenerContainer.setConcurrentConsumers(consumers);
        // 设置监听队列还是主题 默认是队列
        listenerContainer.setPubSubNoLocal(false);
        // 设置应答模式 默认是4
        listenerContainer.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        // 设置是否启动事物 默认不开启
        listenerContainer.setSessionTransacted(false);
        // 将监听容器保存在holder中
        BlockingQueue basket = holder.get(queueName);
        if (basket == null) {
            basket = new ArrayBlockingQueue(5000);
            basket.put(listenerContainer);
        }
        holder.put(queueName, basket);
        listenerContainer.initialize();
        // 启动监听
        listenerContainer.start();
    }
    public CachingConnectionFactory getCachingConnectionFactory() {
        return cachingConnectionFactory;
    }
    public void setCachingConnectionFactory(CachingConnectionFactory cachingConnectionFactory) {
        this.cachingConnectionFactory = cachingConnectionFactory;
    }
    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
}

+ 311 - 0
common-activemq-starter/src/main/java/com/yihu/base/activemq/config/jms/ActiveMQProperties.java

@ -0,0 +1,311 @@
package com.yihu.base.activemq.config.jms;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
 * Created by chenweida on 2018/1/23.
 */
@Configuration
@ConfigurationProperties(prefix = "spring.activemq")
public class ActiveMQProperties {
    /**
     * URL of the ActiveMQ broker. Auto-generated by default.
     */
    private String brokerUrl;
    /**
     * Specify if the default broker URL should be in memory. Ignored if an explicit
     * broker has been specified.
     */
    private boolean inMemory = true;
    /**
     * Login user of the broker.
     */
    private String user;
    /**
     * Login password of the broker.
     */
    private String password;
    /**
     * Time to wait, in milliseconds, before considering a close complete.
     */
    private int closeTimeout = 15000;
    /**
     * Do not stop message delivery before re-delivering messages from a rolled back
     * transaction. This implies that message order will not be preserved when this is
     * enabled.
     */
    private boolean nonBlockingRedelivery = false;
    /**
     * Time to wait, in milliseconds, on Message sends for a response. Set it to 0 to
     * indicate to wait forever.
     */
    private int sendTimeout = 0;
    private Pool pool = new Pool();
    private Packages packages = new Packages();
    public String getBrokerUrl() {
        return this.brokerUrl;
    }
    public void setBrokerUrl(String brokerUrl) {
        this.brokerUrl = brokerUrl;
    }
    public boolean isInMemory() {
        return this.inMemory;
    }
    public void setInMemory(boolean inMemory) {
        this.inMemory = inMemory;
    }
    public String getUser() {
        return this.user;
    }
    public void setUser(String user) {
        this.user = user;
    }
    public String getPassword() {
        return this.password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public int getCloseTimeout() {
        return this.closeTimeout;
    }
    public void setCloseTimeout(int closeTimeout) {
        this.closeTimeout = closeTimeout;
    }
    public boolean isNonBlockingRedelivery() {
        return this.nonBlockingRedelivery;
    }
    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
        this.nonBlockingRedelivery = nonBlockingRedelivery;
    }
    public int getSendTimeout() {
        return this.sendTimeout;
    }
    public void setSendTimeout(int sendTimeout) {
        this.sendTimeout = sendTimeout;
    }
    public Pool getPool() {
        return this.pool;
    }
    public void setPool(Pool pool) {
        this.pool = pool;
    }
    public Packages getPackages() {
        return this.packages;
    }
    public static class Pool {
        /**
         * Whether a PooledConnectionFactory should be created instead of a regular
         * ConnectionFactory.
         */
        private boolean enabled;
        /**
         * Block when a connection is requested and the pool is full. Set it to false to
         * throw a "JMSException" instead.
         */
        private boolean blockIfFull = true;
        /**
         * Blocking period, in milliseconds, before throwing an exception if the pool is
         * still full.
         */
        private long blockIfFullTimeout = -1;
        /**
         * Create a connection on startup. Can be used to warm-up the pool on startup.
         */
        private boolean createConnectionOnStartup = true;
        /**
         * Connection expiration timeout in milliseconds.
         */
        private long expiryTimeout = 0;
        /**
         * Connection idle timeout in milliseconds.
         */
        private int idleTimeout = 30000;
        /**
         * Maximum number of pooled connections.
         */
        private int maxConnections = 1;
        /**
         * Maximum number of active sessions per connection.
         */
        private int maximumActiveSessionPerConnection = 500;
        /**
         * Reset the connection when a "JMXException" occurs.
         */
        private boolean reconnectOnException = true;
        /**
         * Time to sleep, in milliseconds, between runs of the idle connection eviction
         * thread. When negative, no idle connection eviction thread runs.
         */
        private long timeBetweenExpirationCheck = -1;
        /**
         * Use only one anonymous "MessageProducer" instance. Set it to false to create
         * one "MessageProducer" every time one is required.
         */
        private boolean useAnonymousProducers = true;
        public boolean isEnabled() {
            return this.enabled;
        }
        public void setEnabled(boolean enabled) {
            this.enabled = enabled;
        }
        public boolean isBlockIfFull() {
            return this.blockIfFull;
        }
        public void setBlockIfFull(boolean blockIfFull) {
            this.blockIfFull = blockIfFull;
        }
        public long getBlockIfFullTimeout() {
            return this.blockIfFullTimeout;
        }
        public void setBlockIfFullTimeout(long blockIfFullTimeout) {
            this.blockIfFullTimeout = blockIfFullTimeout;
        }
        public boolean isCreateConnectionOnStartup() {
            return this.createConnectionOnStartup;
        }
        public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
            this.createConnectionOnStartup = createConnectionOnStartup;
        }
        public long getExpiryTimeout() {
            return this.expiryTimeout;
        }
        public void setExpiryTimeout(long expiryTimeout) {
            this.expiryTimeout = expiryTimeout;
        }
        public int getIdleTimeout() {
            return this.idleTimeout;
        }
        public void setIdleTimeout(int idleTimeout) {
            this.idleTimeout = idleTimeout;
        }
        public int getMaxConnections() {
            return this.maxConnections;
        }
        public void setMaxConnections(int maxConnections) {
            this.maxConnections = maxConnections;
        }
        public int getMaximumActiveSessionPerConnection() {
            return this.maximumActiveSessionPerConnection;
        }
        public void setMaximumActiveSessionPerConnection(
                int maximumActiveSessionPerConnection) {
            this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
        }
        public boolean isReconnectOnException() {
            return this.reconnectOnException;
        }
        public void setReconnectOnException(boolean reconnectOnException) {
            this.reconnectOnException = reconnectOnException;
        }
        public long getTimeBetweenExpirationCheck() {
            return this.timeBetweenExpirationCheck;
        }
        public void setTimeBetweenExpirationCheck(long timeBetweenExpirationCheck) {
            this.timeBetweenExpirationCheck = timeBetweenExpirationCheck;
        }
        public boolean isUseAnonymousProducers() {
            return this.useAnonymousProducers;
        }
        public void setUseAnonymousProducers(boolean useAnonymousProducers) {
            this.useAnonymousProducers = useAnonymousProducers;
        }
    }
    public static class Packages {
        /**
         * Trust all packages.
         */
        private Boolean trustAll;
        /**
         * Comma-separated list of specific packages to trust (when not trusting all
         * packages).
         */
        private List<String> trusted = new ArrayList<String>();
        public Boolean getTrustAll() {
            return this.trustAll;
        }
        public void setTrustAll(Boolean trustAll) {
            this.trustAll = trustAll;
        }
        public List<String> getTrusted() {
            return this.trusted;
        }
        public void setTrusted(List<String> trusted) {
            this.trusted = trusted;
        }
    }
}

+ 69 - 0
common-activemq-starter/src/main/java/com/yihu/base/activemq/config/jms/JmsActivcemqConfig.java

@ -0,0 +1,69 @@
package com.yihu.base.activemq.config.jms;
import com.yihu.base.activemq.ActiveMQHelper;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
/**
 * Created by chenweida on 2018/1/23.
 */
@Configuration
@EnableJms
public class JmsActivcemqConfig {
    @Autowired
    private ActiveMQProperties activeMQProperties;
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(activeMQProperties.getBrokerUrl());
        connectionFactory.setUserName(activeMQProperties.getUser());
        connectionFactory.setPassword(activeMQProperties.getPassword());
        //设置异步发送
        connectionFactory.setUseAsyncSend(true);
        return connectionFactory;
    }
    /**
     * 缓存session链接
     *
     * @return
     */
    @Bean
    @Primary
    public CachingConnectionFactory CachingConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        //目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory
        cachingConnectionFactory.setTargetConnectionFactory(connectionFactory());
        //Session缓存数量,这里属性也可以直接在这里配置
        cachingConnectionFactory.setSessionCacheSize(100);
        return cachingConnectionFactory;
    }
    @Bean
    @Primary
    public JmsTemplate jmsQueueTemplate() {
        return new JmsTemplate(connectionFactory());
    }
    @Bean
    @Primary
    public ActiveMQHelper actifveMQHelper() {
        ActiveMQHelper actifveMQHelper = new ActiveMQHelper();
        actifveMQHelper.setCachingConnectionFactory(CachingConnectionFactory());
        actifveMQHelper.setJmsTemplate(jmsQueueTemplate());
        return actifveMQHelper;
    }
}

+ 3 - 0
common-activemq-starter/src/main/resources/META-INF/spring.factories

@ -0,0 +1,3 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.yihu.base.activemq.config.jms.ActiveMQProperties,\
  com.yihu.base.activemq.config.jms.JmsActivcemqConfig

+ 5 - 0
common-activemq-starter/src/main/resources/template.yml

@ -0,0 +1,5 @@
spring:
  activemq:
    broker-url: tcp://172.19.103.87:61616
    user: admin
    password: admin