ElasticSearchPool.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package com.yihu.elasticsearch;
  2. import com.alibaba.druid.pool.DruidDataSource;
  3. import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory;
  4. import com.yihu.elasticsearch.config.ElasticSearchConfig;
  5. import org.elasticsearch.client.transport.TransportClient;
  6. import org.elasticsearch.common.settings.Settings;
  7. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  10. import org.springframework.context.annotation.Scope;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.util.StringUtils;
  13. import java.net.InetSocketAddress;
  14. import java.util.Properties;
  15. /**
  16. * Created by progr1mmer on 2018/1/4.
  17. */
  18. @Component
  19. @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
  20. public class ElasticSearchPool {
  21. private static volatile TransportClient transportClient;
  22. @Autowired
  23. private ElasticSearchConfig elasticSearchConfig;
  24. private TransportClient getTransportClient() {
  25. Settings settings = Settings.builder()
  26. .put("cluster.name", elasticSearchConfig.getClusterName())
  27. .put("client.transport.sniff", elasticSearchConfig.isClientTransportSniff())
  28. .build();
  29. String[] nodeArr = elasticSearchConfig.getClusterNodes().split(",");
  30. InetSocketTransportAddress[] socketArr = new InetSocketTransportAddress[nodeArr.length];
  31. for (int i = 0; i < socketArr.length; i++) {
  32. if (!StringUtils.isEmpty(nodeArr[i])) {
  33. String[] nodeInfo = nodeArr[i].split(":");
  34. socketArr[i] = new InetSocketTransportAddress(new InetSocketAddress(nodeInfo[0], new Integer(nodeInfo[1])));
  35. }
  36. }
  37. return TransportClient.builder().settings(settings).build().addTransportAddresses(socketArr);
  38. }
  39. /**
  40. * 1.TransportClient本身支持多线程的数据请求
  41. * 2.移除多个TransportClient的线程池支持,减少Socket链接
  42. * 3.基于多重检查的单例模式,兼顾安全和效率
  43. * 4.为提高效率,使用完毕后请勿进行 transportClient.close() 的关闭操作
  44. * @return
  45. */
  46. public TransportClient getClient() {
  47. if (transportClient != null) {
  48. if (transportClient.connectedNodes().isEmpty()) {
  49. synchronized (TransportClient.class) {
  50. if (transportClient.connectedNodes().isEmpty()) {
  51. transportClient = getTransportClient();
  52. }
  53. }
  54. }
  55. return transportClient;
  56. }
  57. synchronized (TransportClient.class) {
  58. if (null == transportClient) {
  59. transportClient = getTransportClient();
  60. }
  61. }
  62. return transportClient;
  63. }
  64. public DruidDataSource getDruidDataSource() throws Exception {
  65. Properties properties = new Properties();
  66. properties.put("url", "jdbc:elasticsearch://" + elasticSearchConfig.getClusterNodes() + "/");
  67. DruidDataSource druidDataSource = (DruidDataSource) ElasticSearchDruidDataSourceFactory
  68. .createDataSource(properties);
  69. druidDataSource.setInitialSize(1);
  70. return druidDataSource;
  71. }
  72. }