Browse Source

es和mq相关代码注释,这个版本不上

yeshijie 7 years ago
parent
commit
1f3b67948f

+ 90 - 90
patient-co/patient-co-wlyy-job/src/main/java/com/yihu/wlyy/activemq/StartListener.java

@ -1,90 +1,90 @@
package com.yihu.wlyy.activemq;
import com.yihu.wlyy.util.SpringUtil;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.jms.MessageListener;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by chenweida on 2017/9/9.
 */
@Component
public class StartListener {
    private static Map<String, DefaultMessageListenerContainer> holder = new HashMap<String, DefaultMessageListenerContainer>();
    private Logger logger = LoggerFactory.getLogger(StartListener.class);
    @Value("${activemq.queue.healtHarticleQueue}")
    private String healtHarticleQueue;
    @Value("${activemq.consumers.count}")
    private Integer count;
    @Autowired
    private HealthArtListener healthArtListener;
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;
    @PostConstruct
    public void startListener() {
        startHealthArticQueueListener();
    }
    /**
     * 启动健康教育消费
     */
    private void startHealthArticQueueListener() {
        try {
            //启动监听
            startReceiverByQueueName(healthArtListener, healtHarticleQueue);
            logger.info("start HealthArtListener success");
        } catch (Exception e) {
            logger.error("start HealthArtListener error");
        }
    }
    /**
     * 启动
     *
     * @param receiver
     * @param queueName
     */
    private void startReceiverByQueueName(MessageListener receiver, String queueName) {
        if (holder.get(queueName) != null) {
            return;
        }
        ActiveMQQueue destination = new ActiveMQQueue(queueName);
        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
        // 监听容器属性的配置
        listenerContainer.setConnectionFactory(cachingConnectionFactory);
        // 设置目的地
        listenerContainer.setDestination(destination);
        // 设置监听器
        listenerContainer.setMessageListener(receiver);
        // 设置消费者集群数
        int consumers = count;
        listenerContainer.setConcurrentConsumers(2);
        listenerContainer.setMaxConcurrentConsumers(consumers);
        // 设置监听队列还是主题 默认是队列
        listenerContainer.setPubSubNoLocal(false);
        // listenerContainer.setAcceptMessagesWhileStopping(true);
        // 设置应答模式 默认是4
        listenerContainer.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        // 设置是否启动事物 默认不开启
        listenerContainer.setSessionTransacted(false);
        // 将监听容器保存在holder中
        holder.put(queueName, listenerContainer);
        listenerContainer.initialize();
        // 启动监听
        listenerContainer.start();
    }
}
//package com.yihu.wlyy.activemq;
//
//import com.yihu.wlyy.util.SpringUtil;
//import org.apache.activemq.ActiveMQSession;
//import org.apache.activemq.command.ActiveMQQueue;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.jms.connection.CachingConnectionFactory;
//import org.springframework.jms.listener.DefaultMessageListenerContainer;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//import javax.jms.MessageListener;
//import java.util.HashMap;
//import java.util.Map;
//
///**
// * Created by chenweida on 2017/9/9.
// */
//@Component
//public class StartListener {
//    private static Map<String, DefaultMessageListenerContainer> holder = new HashMap<String, DefaultMessageListenerContainer>();
//    private Logger logger = LoggerFactory.getLogger(StartListener.class);
//
//    @Value("${activemq.queue.healtHarticleQueue}")
//    private String healtHarticleQueue;
//    @Value("${activemq.consumers.count}")
//    private Integer count;
//    @Autowired
//    private HealthArtListener healthArtListener;
//    @Autowired
//    private CachingConnectionFactory cachingConnectionFactory;
//
//    @PostConstruct
//    public void startListener() {
//        startHealthArticQueueListener();
//    }
//
//    /**
//     * 启动健康教育消费
//     */
//    private void startHealthArticQueueListener() {
//        try {
//            //启动监听
//            startReceiverByQueueName(healthArtListener, healtHarticleQueue);
//            logger.info("start HealthArtListener success");
//        } catch (Exception e) {
//            logger.error("start HealthArtListener error");
//        }
//    }
//
//    /**
//     * 启动
//     *
//     * @param receiver
//     * @param queueName
//     */
//    private void startReceiverByQueueName(MessageListener receiver, String queueName) {
//        if (holder.get(queueName) != null) {
//            return;
//        }
//        ActiveMQQueue destination = new ActiveMQQueue(queueName);
//
//        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
//        // 监听容器属性的配置
//        listenerContainer.setConnectionFactory(cachingConnectionFactory);
//        // 设置目的地
//        listenerContainer.setDestination(destination);
//        // 设置监听器
//        listenerContainer.setMessageListener(receiver);
//        // 设置消费者集群数
//        int consumers = count;
//        listenerContainer.setConcurrentConsumers(2);
//        listenerContainer.setMaxConcurrentConsumers(consumers);
//        // 设置监听队列还是主题 默认是队列
//        listenerContainer.setPubSubNoLocal(false);
//        // listenerContainer.setAcceptMessagesWhileStopping(true);
//        // 设置应答模式 默认是4
//        listenerContainer.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
//        // 设置是否启动事物 默认不开启
//        listenerContainer.setSessionTransacted(false);
//        // 将监听容器保存在holder中
//        holder.put(queueName, listenerContainer);
//        listenerContainer.initialize();
//        // 启动监听
//        listenerContainer.start();
//    }
//}

+ 70 - 70
patient-co/patient-co-wlyy-job/src/main/java/com/yihu/wlyy/config/ActiveMQConfig.java

@ -1,70 +1,70 @@
package com.yihu.wlyy.config;
import com.yihu.wlyy.activemq.HealthArtListener;
import com.yihu.wlyy.util.SpringUtil;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.annotation.PostConstruct;
import javax.jms.MessageListener;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by chenweida on 2017/9/9.
 * 生产者配置
 */
@EnableJms
@Configuration
public class ActiveMQConfig {
   @Value("${activemq.username}")
    private String username;
    @Value("${activemq.password}")
    private String password;
    @Value("${activemq.url}")
    private String url;
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, url);
        //设置异步发送
        activeMQConnectionFactory.setUseAsyncSend(true);
        return activeMQConnectionFactory;
    }
    /**
     * 缓存session链接
     *
     * @return
     */
    @Bean
    @Primary
    public CachingConnectionFactory CachingConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        //目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory
        cachingConnectionFactory.setTargetConnectionFactory(activeMQConnectionFactory());
        //Session缓存数量,这里属性也可以直接在这里配置
        cachingConnectionFactory.setSessionCacheSize(100);
        return cachingConnectionFactory;
    }
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(CachingConnectionFactory());
        return jmsTemplate;
    }
}
//package com.yihu.wlyy.config;
//
//import com.yihu.wlyy.activemq.HealthArtListener;
//import com.yihu.wlyy.util.SpringUtil;
//import org.apache.activemq.ActiveMQConnectionFactory;
//import org.apache.activemq.ActiveMQSession;
//import org.apache.activemq.command.ActiveMQQueue;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.context.annotation.Primary;
//import org.springframework.jms.annotation.EnableJms;
//import org.springframework.jms.connection.CachingConnectionFactory;
//import org.springframework.jms.core.JmsTemplate;
//import org.springframework.jms.listener.DefaultMessageListenerContainer;
//
//import javax.annotation.PostConstruct;
//import javax.jms.MessageListener;
//import java.util.HashMap;
//import java.util.Map;
//
///**
// * Created by chenweida on 2017/9/9.
// * 生产者配置
// */
//@EnableJms
//@Configuration
//public class ActiveMQConfig {
//   @Value("${activemq.username}")
//    private String username;
//    @Value("${activemq.password}")
//    private String password;
//    @Value("${activemq.url}")
//    private String url;
//
//    @Bean
//    public ActiveMQConnectionFactory activeMQConnectionFactory() {
//        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, url);
//        //设置异步发送
//        activeMQConnectionFactory.setUseAsyncSend(true);
//        return activeMQConnectionFactory;
//    }
//
//    /**
//     * 缓存session链接
//     *
//     * @return
//     */
//    @Bean
//    @Primary
//    public CachingConnectionFactory CachingConnectionFactory() {
//        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
//        //目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory
//        cachingConnectionFactory.setTargetConnectionFactory(activeMQConnectionFactory());
//        //Session缓存数量,这里属性也可以直接在这里配置
//        cachingConnectionFactory.setSessionCacheSize(100);
//        return cachingConnectionFactory;
//    }
//
//    @Bean
//    public JmsTemplate jmsTemplate() {
//        JmsTemplate jmsTemplate = new JmsTemplate();
//        jmsTemplate.setConnectionFactory(CachingConnectionFactory());
//
//        return jmsTemplate;
//    }
//
//}

+ 93 - 93
patient-co/patient-co-wlyy-job/src/main/java/com/yihu/wlyy/config/es/ElasticFactory.java

@ -1,93 +1,93 @@
package com.yihu.wlyy.config.es;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
/**
 * Created by chenweida on 2017/6/5.
 */
@Component
public class ElasticFactory {
    private static JestClientFactory factory = null;
    @Value("${es.host}")
    private String esHost;
    @Value("${es.port}")
    private String port;
    @Value("${es.tPort}")
    private String tPort;
    @Value("${es.clusterName}")
    private String clusterName;
//-----------------------------------jestClient----------------------------------------
    /**
     * @param "http://localhost:9200"
     * @return
     */
    @PostConstruct
    public JestClient getJestClient() {
        synchronized (ElasticFactory.class) {
            if (factory == null) {
                //初始化链接
                init();
            }
        }
        return factory.getObject();
    }
    /**
     * 初始化链接
     */
    public synchronized void init() {
        // Construct a new Jest client according to configuration via factory
        factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig
                .Builder("http://" + esHost + ":" + port)
                .multiThreaded(true)
                .maxTotalConnection(50)// 最大链接
                .maxConnectionIdleTime(120, TimeUnit.SECONDS)//链接等待时间
                .connTimeout(30000)
                .discoveryEnabled(true)
                .readTimeout(30000)//30秒
                .build());//得到链接
    }
    //-----------------------------------TransportClient----------------------------------------
    private Client transportClient;
    public Client getTransportClient() {
        try {
            initTranClient();
            return transportClient;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    private synchronized void initTranClient() throws UnknownHostException {
        if (transportClient == null) {
            Settings settings = Settings.settingsBuilder()
                    .put("client.transport.sniff", true)//开启嗅探功能
                    .put("cluster.name", StringUtils.isEmpty(clusterName) ? "jkzl" : clusterName)//默认集群名字是jkzl
                    .build();
            transportClient = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), Integer.valueOf(tPort)));
        }
    }
}
//package com.yihu.wlyy.config.es;
//
//import io.searchbox.client.JestClient;
//import io.searchbox.client.JestClientFactory;
//import io.searchbox.client.config.HttpClientConfig;
//import org.elasticsearch.client.Client;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.stereotype.Component;
//import org.springframework.util.StringUtils;
//
//import javax.annotation.PostConstruct;
//import java.net.InetAddress;
//import java.net.UnknownHostException;
//import java.util.concurrent.TimeUnit;
//
///**
// * Created by chenweida on 2017/6/5.
// */
//@Component
//public class ElasticFactory {
//    private static JestClientFactory factory = null;
//
//    @Value("${es.host}")
//    private String esHost;
//    @Value("${es.port}")
//    private String port;
//    @Value("${es.tPort}")
//    private String tPort;
//    @Value("${es.clusterName}")
//    private String clusterName;
////-----------------------------------jestClient----------------------------------------
//
//    /**
//     * @param "http://localhost:9200"
//     * @return
//     */
//    @PostConstruct
//    public JestClient getJestClient() {
//        synchronized (ElasticFactory.class) {
//            if (factory == null) {
//                //初始化链接
//                init();
//            }
//        }
//        return factory.getObject();
//    }
//
//    /**
//     * 初始化链接
//     */
//    public synchronized void init() {
//
//        // Construct a new Jest client according to configuration via factory
//        factory = new JestClientFactory();
//        factory.setHttpClientConfig(new HttpClientConfig
//                .Builder("http://" + esHost + ":" + port)
//                .multiThreaded(true)
//                .maxTotalConnection(50)// 最大链接
//                .maxConnectionIdleTime(120, TimeUnit.SECONDS)//链接等待时间
//                .connTimeout(30000)
//                .discoveryEnabled(true)
//                .readTimeout(30000)//30秒
//                .build());//得到链接
//    }
//
//    //-----------------------------------TransportClient----------------------------------------
//    private Client transportClient;
//
//    public Client getTransportClient() {
//        try {
//            initTranClient();
//            return transportClient;
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//        return null;
//    }
//
//    private synchronized void initTranClient() throws UnknownHostException {
//        if (transportClient == null) {
//            Settings settings = Settings.settingsBuilder()
//                    .put("client.transport.sniff", true)//开启嗅探功能
//                    .put("cluster.name", StringUtils.isEmpty(clusterName) ? "jkzl" : clusterName)//默认集群名字是jkzl
//                    .build();
//
//            transportClient = TransportClient.builder().settings(settings).build()
//                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), Integer.valueOf(tPort)));
//        }
//    }
//}

+ 49 - 49
patient-co/patient-co-wlyy-job/src/main/java/com/yihu/wlyy/config/es/ElastricSearchSave.java

@ -1,49 +1,49 @@
package com.yihu.wlyy.config.es;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.List;
;
/**
 * Created by chenweida on 2017/6/2.
 */
@Component
@Scope("prototype")
public class ElastricSearchSave {
    private Logger logger = LoggerFactory.getLogger(ElastricSearchSave.class);
    @Autowired
    private ElasticFactory elasticFactory;
    public Boolean save(List  sms,String esIndex,String esType) {
        try {
            //得到链接
            JestClient jestClient = elasticFactory.getJestClient();
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            for (Object obj : sms) {
                Index index = new Index.Builder(obj).build();
                bulk.addAction(index);
            }
            BulkResult br = jestClient.execute(bulk.build());
            logger.info("save data count:" + sms.size());
            logger.info("save flag:" + br.isSucceeded());
            return br.isSucceeded();
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(" save error :" + e.getMessage());
        }
        return null;
    }
}
//package com.yihu.wlyy.config.es;
//
//import io.searchbox.client.JestClient;
//import io.searchbox.core.Bulk;
//import io.searchbox.core.BulkResult;
//import io.searchbox.core.Index;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Scope;
//import org.springframework.stereotype.Component;
//
//import java.util.List;
//
//;
//
///**
// * Created by chenweida on 2017/6/2.
// */
//@Component
//@Scope("prototype")
//public class ElastricSearchSave {
//
//    private Logger logger = LoggerFactory.getLogger(ElastricSearchSave.class);
//    @Autowired
//    private ElasticFactory elasticFactory;
//
//    public Boolean save(List  sms,String esIndex,String esType) {
//        try {
//            //得到链接
//            JestClient jestClient = elasticFactory.getJestClient();
//
//            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
//            for (Object obj : sms) {
//                Index index = new Index.Builder(obj).build();
//                bulk.addAction(index);
//            }
//            BulkResult br = jestClient.execute(bulk.build());
//            logger.info("save data count:" + sms.size());
//            logger.info("save flag:" + br.isSucceeded());
//            return br.isSucceeded();
//        } catch (Exception e) {
//            e.printStackTrace();
//            logger.error(" save error :" + e.getMessage());
//        }
//        return null;
//    }
//
//}