Przeglądaj źródła

负载均衡时消息重复推送的问题

suxiaoyang 6 lat temu
rodzic
commit
5a34adc186

+ 41 - 16
svr/svr-base/src/main/java/com/yihu/jw/base/activemq/ConsumerRunner.java

@ -9,6 +9,7 @@ import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.Set;
/**
@ -21,36 +22,59 @@ public class ConsumerRunner implements Runnable, ExceptionListener {
    private final String topic;
    private Set<String> pushUrl;
    private TopicConnection topicConnection;
    private QueueConnection connection;
    private Session session;
    private MessageConsumer consumer;
    private RestTemplate restTemplate;
    public ConsumerRunner(String topic, Set<String> pushUrl) throws Exception {
    public ConsumerRunner(String topic, Set<String> pushUrl) {
        Assert.notNull(topic, "Topic cannot be null");
        this.topic = topic;
        this.pushUrl = pushUrl;
        init();
    }
    private void init() throws Exception {
    private void init() {
        try {
            ActiveMQConnectionFactory connectionFactory = SpringContext.getService(ActiveMQConnectionFactory.class);
            // Create a Connection
            connection = connectionFactory.createQueueConnection();
            connection.start();
            connection.setExceptionListener(this);
            // Create a Session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue(topic);
            // Create a MessageConsumer from the Session to the Topic or Queue
            consumer = session.createConsumer(destination);
            restTemplate = new RestTemplate();
        } catch (JMSException e) {
            LOGGER.error("Failed to init ConsumerRunner", e);
        }
    }
    private void recover() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = SpringContext.getService(ActiveMQConnectionFactory.class);
        // Create a Connection
        topicConnection = connectionFactory.createTopicConnection();
        topicConnection.start();
        topicConnection.setExceptionListener(this);
        connection = connectionFactory.createQueueConnection();
        connection.start();
        connection.setExceptionListener(this);
        // Create a Session
        Session session = topicConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Create the destination (Topic or Queue)
        Destination destination = session.createTopic(topic);
        Destination destination = session.createQueue(topic);
        // Create a MessageConsumer from the Session to the Topic or Queue
        consumer = session.createConsumer(destination);
        restTemplate = new RestTemplate();
        if (null == restTemplate) {
            restTemplate = new RestTemplate();
        }
    }
    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
                // Wait for a message
                Message message = consumer.receive(1000);
                if (message != null) {
@ -70,18 +94,19 @@ public class ConsumerRunner implements Runnable, ExceptionListener {
                }
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
                if (e instanceof IllegalStateException) {
                    try {
                        recover();
                    } catch (JMSException jme) {
                        LOGGER.error("Failed to recover ConsumerRunner", jme);
                    }
                }
            }
        }
    }
    @Override
    public void onException(JMSException e) {
        LOGGER.error("Trying to recover from JMS Connection exception", e);
        try {
            topicConnection.close();
            topicConnection.start();
        } catch (Exception ex) {
            LOGGER.error("Failed to recover JMS Connection", ex);
        }
        LOGGER.error("ConsumerRunner onException", e);
    }
}