Browse Source

ES 更新

chenweida 7 years ago
parent
commit
cd0f37209f

+ 32 - 43
common-data-es-starter/src/main/java/com/yihu/base/es/config/ElasticFactory.java

@ -2,38 +2,29 @@ package com.yihu.base.es.config;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.ClientConfig;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeUnit;
/**
/**
 * Created by chenweida on 2017/6/5.
 * Created by chenweida on 2017/6/5.
 */
 */
@Component
@Configuration
public class ElasticFactory {
public class ElasticFactory {
    private static JestClientFactory factory = null;
    private static JestClientFactory factory = null;
    @Value("${spring.data.elasticsearch.cluster-name}")
    private String clusterNames;
    @Value("${spring.data.elasticsearch.cluster-nodes}")
    private String clusterNodes; // 120.25.194.233:9300,120.25.194.233:9300,120.25.194.233:9300
    @Value("${spring.elasticsearch.jest.uris}")
    private String jestHost; // http://192.168.226.133:9200
    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;
    @Value("${spring.data.elasticsearch.cluster-nodes-jest}")
    private String esHost;//http://59.61.92.90:9065,http://59.61.92.90:9067
//-----------------------------------jestClient----------------------------------------
//-----------------------------------jestClient----------------------------------------
    /**
    /**
@ -50,51 +41,49 @@ public class ElasticFactory {
    /**
    /**
     * 初始化链接
     * 初始化链接
     * 9200
     */
     */
    public synchronized void init() {
    public synchronized void init() {
        String[] hostArray = esHost.split(",");
        // Construct a new Jest client according to configuration via factory
        // Construct a new Jest client according to configuration via factory
        factory = new JestClientFactory();
        factory = new JestClientFactory();
        Set<String> serverList = new LinkedHashSet<>();
        String[] uris = jestHost.split(",");
        serverList.addAll(CollectionUtils.arrayToList(uris));
        factory.setHttpClientConfig(new HttpClientConfig
                .Builder(serverList)
        HttpClientConfig httpClientConfig = new HttpClientConfig
                .Builder(Arrays.asList(hostArray))//http://59.61.92.90:9065,http://59.61.92.90:9067
                .multiThreaded(true)
                .multiThreaded(true)
                .maxTotalConnection(50)// 最大链接
                .maxTotalConnection(50)// 最大链接
                .maxConnectionIdleTime(120, TimeUnit.SECONDS)//链接等待时间
                .maxConnectionIdleTime(120, TimeUnit.SECONDS)//链接等待时间
                .connTimeout(60 * 1000)
                .connTimeout(60 * 1000)
                // .discoveryEnabled(true)
                // .discoveryEnabled(true)
                .readTimeout(60 * 1000)//60秒
                .readTimeout(60 * 1000)//60秒
                .build());//得到链接
                .build();
        factory.setHttpClientConfig(httpClientConfig);//得到链接
    }
    }
    //-----------------------------------TransportClient----------------------------------------
    //-----------------------------------TransportClient----------------------------------------
    private TransportClient transportClient;
    public TransportClient getTransportClient() {
    public Client getTransportClient() {
        try {
        try {
            initTranClient();
            return transportClient;
            return elasticsearchTemplate.getClient();
        } catch (Exception e) {
        } catch (Exception e) {
            e.printStackTrace();
            e.printStackTrace();
        }
        }
        return null;
        return null;
    }
    }
    private synchronized void initTranClient() throws UnknownHostException {
        if (transportClient == null) {
            Settings settings = Settings.settingsBuilder()
                    // .put("client.transport.sniff", true)//开启嗅探功能
                    .put("cluster.name", StringUtils.isEmpty(clusterNames) ? "jkzl" : clusterNames)//默认集群名字是jkzl
                    .build();
            transportClient = TransportClient.builder().settings(settings).build();
            String[] ips = clusterNodes.split(",");
            for (String ip : ips) {
                String[] ipAndPost = ip.split(":");
                transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ipAndPost[0]), Integer.valueOf(ipAndPost[1])));
            }
    @Bean
    public ElasticsearchUtil elasticsearchUtil() {
        ElasticsearchUtil elasticsearchUtil = new ElasticsearchUtil();
        elasticsearchUtil.setElasticFactory(getTransportClient());
        return elasticsearchUtil;
    }
        }
    @Bean
    public ElastricSearchHelper elastricSearchHelper() {
        ElastricSearchHelper elastricSearchHelper = new ElastricSearchHelper();
        elastricSearchHelper.setElasticsearchUtil(elasticsearchUtil());
        return elastricSearchHelper;
    }
    }
}
}

+ 105 - 0
common-data-es-starter/src/main/java/com/yihu/base/es/config/ElasticsearchUtil.java

@ -0,0 +1,105 @@
package com.yihu.base.es.config;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.parser.SQLExprParser;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.nlpcn.es4sql.domain.Select;
import org.nlpcn.es4sql.jdbc.ObjectResult;
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
import org.nlpcn.es4sql.parse.SqlParser;
import org.nlpcn.es4sql.query.AggregationQueryAction;
import org.nlpcn.es4sql.query.DefaultQueryAction;
import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Created by chenweida on 2017/7/17.
 * SELECT town,townName,sum(result1) result1 FROM wlyy_quota_test
 * where quotaCode='1'
 * group by town,townName , date_histogram(field='quotaDate','interval'='week')
 */
@Component
public class ElasticsearchUtil {
    private Logger logger = LoggerFactory.getLogger(ElasticsearchUtil.class);
    private Client elasticFactory;
    /**
     * 执行sql查询es
     *
     * @param sql
     * @return
     */
    public List<Map<String, Object>> excuteDataModel(String sql) {
        List<Map<String, Object>> returnModels = new ArrayList<>();
        try {
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
            SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
//            if (parser.getLexer().token() != Token.EOF) {
//                throw new ParserException("illegal sql expr : " + sql);
//            }
            Select select = null;
            select = new SqlParser().parseSelect(queryExpr);
            //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
            AggregationQueryAction action = null;
            DefaultQueryAction queryAction = null;
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory, select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                Client client = elasticFactory;
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            Object queryResult = null;
            if (sql.toUpperCase().indexOf("GROUP") != -1 || sql.toUpperCase().indexOf("SUM") != -1) {
                queryResult = response.getAggregations();
            } else {
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            temp.getLines().stream().forEach(one -> {
                try {
                    Map<String, Object> oneMap = new HashMap<String, Object>();
                    for (int i = 0; i < one.size(); i++) {
                        String key = null;
                        Object value = one.get(i);
                        key = heads.get(i);
                        oneMap.put(key, value);
                    }
                    returnModels.add(oneMap);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnModels;
    }
    public Client getElasticFactory() {
        return elasticFactory;
    }
    public void setElasticFactory(Client elasticFactory) {
        this.elasticFactory = elasticFactory;
    }
}

+ 53 - 67
common-data-es-starter/src/main/java/com/yihu/base/es/config/ElastricSearchHelper.java

@ -1,21 +1,14 @@
package com.yihu.base.es.config;
package com.yihu.base.es.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.yihu.base.es.config.model.SaveModel;
import com.yihu.base.es.config.model.ESIDEntity;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import io.searchbox.core.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.UpdateQueryBuilder;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import java.util.Map;
import java.util.Map;
@ -24,15 +17,23 @@ import java.util.Map;
/**
/**
 * Created by chenweida on 2017/6/2.
 * Created by chenweida on 2017/6/2.
 */
 */
@Component
@Scope("prototype")
public class ElastricSearchHelper {
public class ElastricSearchHelper {
    private ElasticsearchUtil elasticsearchUtil;
    private Logger logger = LoggerFactory.getLogger(ElastricSearchHelper.class);
    private Logger logger = LoggerFactory.getLogger(ElastricSearchHelper.class);
    @Autowired
    @Autowired
    private ElasticFactory elasticFactory;
    private ElasticFactory elasticFactory;
    public Boolean save(String index, String type, List<SaveModel> sms) {
    /**
     * 新增
     *
     * @param index
     * @param type
     * @param sms
     * @return
     */
    public Boolean save(String index, String type, List<Object> sms) {
        JestClient jestClient = null;
        JestClient jestClient = null;
        try {
        try {
            //得到链接elasticFactory.getJestClient();
            //得到链接elasticFactory.getJestClient();
@ -40,7 +41,7 @@ public class ElastricSearchHelper {
            int success = 0;
            int success = 0;
            int error = 0;
            int error = 0;
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            for (SaveModel obj : sms) {
            for (Object obj : sms) {
                try {
                try {
                    Index indexObj = new Index.Builder(obj).build();
                    Index indexObj = new Index.Builder(obj).build();
                    success++;
                    success++;
@ -98,7 +99,7 @@ public class ElastricSearchHelper {
    }
    }
    public Boolean update(String index, String type, List<SaveModel> sms) {
    public Boolean update(String index, String type, List<Object> sms) {
        JestClient jestClient = null;
        JestClient jestClient = null;
        BulkResult br = null;
        BulkResult br = null;
        try {
        try {
@ -109,11 +110,11 @@ public class ElastricSearchHelper {
            int error = 0;
            int error = 0;
            boolean isSuccessed = true;
            boolean isSuccessed = true;
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            for (SaveModel obj : sms) {
            for (Object obj : sms) {
                try {
                try {
                    JSONObject jo = new JSONObject();
                    JSONObject jo = new JSONObject();
                    jo.put("doc", obj);
                    jo.put("doc", obj);
                    Update indexObj = new Update.Builder(jo.toString()).index(index).type(type).id(obj.getId()).build();
                    Update indexObj = new Update.Builder(jo.toString()).index(index).type(type).id(((ESIDEntity) obj).getId()).build();
                    bulk.addAction(indexObj);
                    bulk.addAction(indexObj);
                    success++;
                    success++;
                } catch (Exception e) {
                } catch (Exception e) {
@ -137,50 +138,50 @@ public class ElastricSearchHelper {
        return br.isSucceeded();
        return br.isSucceeded();
    }
    }
    /**
    /**
     * 删除
     * 修改
     */
     */
    public void deleteData(String index, String type, List<SaveModel> saveModels) {
    public boolean update(String index, String type, String _id, JSONObject source) {
        JestClient jestClient = null;
        JestClient jestClient = null;
        JestResult jestResult = null;
        try {
        try {
            jestClient = elasticFactory.getJestClient();
            jestClient = elasticFactory.getJestClient();
            //根据id批量删除
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            for (SaveModel obj : saveModels) {
                Delete indexObj = new Delete.Builder(obj.getId()).build();
                bulk.addAction(indexObj);
            }
            BulkResult br = jestClient.execute(bulk.build());
            logger.info("delete data count:" + saveModels.size());
            logger.info("delete flag:" + br.isSucceeded());
            JSONObject docSource = new JSONObject();
            docSource.put("doc", source);
            Update update = new Update.Builder(docSource).index(index).type(type).id(_id).build();
            jestResult = jestClient.execute(update);
            logger.info("update info:" + jestResult.isSucceeded());
        } catch (Exception e) {
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("update fail:" + _id, e.getMessage());
            return false;
        } finally {
        } finally {
            if (jestClient != null) {
            if (jestClient != null) {
                jestClient.shutdownClient();
                jestClient.shutdownClient();
            }
            }
        }
        }
        return true;
    }
    }
    /**
    /**
     * 查询
     * 删除
     */
     */
    public SearchResult search(String index, String type, String queryStr) {
    public void delete(String index, String type, List<Object> ESIDEntitys) {
        JestClient jestClient = null;
        JestClient jestClient = null;
        SearchResult result = null;
        try {
        try {
            jestClient = elasticFactory.getJestClient();
            jestClient = elasticFactory.getJestClient();
            Search search = new Search.Builder(queryStr)
                    // multiple index or types can be added.
                    .addIndex(index)
                    .addType(type)
                    .build();
            result = jestClient.execute(search);
            logger.info("search data count:" + result.getTotal());
            //根据id批量删除
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
            for (Object obj : ESIDEntitys) {
                Delete indexObj = new Delete.Builder(((ESIDEntity) obj).getId()).build();
                bulk.addAction(indexObj);
            }
            BulkResult br = jestClient.execute(bulk.build());
            logger.info("delete data count:" + ESIDEntitys.size());
            logger.info("delete flag:" + br.isSucceeded());
        } catch (Exception e) {
        } catch (Exception e) {
            e.printStackTrace();
            e.printStackTrace();
        } finally {
        } finally {
@ -188,39 +189,24 @@ public class ElastricSearchHelper {
                jestClient.shutdownClient();
                jestClient.shutdownClient();
            }
            }
        }
        }
        return result;
    }
    }
    /**
    /**
     * 修改
     * 执行sql
     *
     * @param sql
     * @return
     */
     */
    public boolean update(String index, String type,String _id, JSONObject source) {
        JestClient jestClient = null;
        JestResult jestResult = null;
        try {
            jestClient = elasticFactory.getJestClient();
            JSONObject docSource = new JSONObject();
            docSource.put("doc",source);
            Update update = new Update.Builder(docSource).index(index).type(type).id(_id).build();
            jestResult = jestClient.execute(update);
            logger.info("update info:" + jestResult.isSucceeded());
        } catch (Exception e) {
            logger.error("update fail:" + _id,e.getMessage());
            return false;
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
        return true;
    public List<Map<String, Object>> excuceSQL(String sql) {
        return elasticsearchUtil.excuteDataModel(sql);
    }
    }
    public ElasticsearchUtil getElasticsearchUtil() {
        return elasticsearchUtil;
    }
    public static void main(String args[]){
        String json = "";
        JSONObject resultJsonObject = (JSONObject)JSONObject.parse(json);
        JSONObject jsonObject = (JSONObject)resultJsonObject.get("hits");
        System.out.println(jsonObject.get("total"));
    public void setElasticsearchUtil(ElasticsearchUtil elasticsearchUtil) {
        this.elasticsearchUtil = elasticsearchUtil;
    }
    }
}
}

+ 1 - 1
common-data-es-starter/src/main/java/com/yihu/base/es/config/model/SaveModel.java

@ -6,7 +6,7 @@ import io.searchbox.annotations.JestId;
 * es保存model的公共父类
 * es保存model的公共父类
 * Created by chenweida on 2017/11/3.
 * Created by chenweida on 2017/11/3.
 */
 */
public class SaveModel {
public class ESIDEntity {
    @JestId
    @JestId
    private String id;
    private String id;

+ 1 - 1
common-data-es-starter/src/main/resources/META-INF/spring.factories

@ -1 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.yihu.base.es.config.ElastricSearchHelper
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.yihu.base.es.config.ElasticFactory

+ 10 - 25
common-data-es-starter/src/main/resources/template.yml

@ -1,28 +1,13 @@
es:
  host:  59.61.92.90
  port: 9067 #默认是9200
  tPort: 9068 #http端口 默认是9300
  clusterName: jkzl
spring:
spring:
  data:
  data:
    elasticsearch: #ElasticsearchProperties
      cluster-name: jkzl #默认即为elasticsearch  集群名
      cluster-nodes: 120.25.194.233:9300,120.25.194.233:9300 #配置es节点信息,逗号分隔,如果没有指定,则启动ClientNode
      local: false ##是否本地连接
      properties: # Additional properties used to configure the client.
        enable: true
  # JEST (Elasticsearch HTTP client) (JestProperties)
  elasticsearch:
    jest:
      uris: http://172.17.110.217:9200,http://172.17.110.128:9200
      connection-timeout: # Connection timeout in milliseconds.
      multi-threaded: true # Enable connection requests from multiple execution threads.
      username: # Login user.
      password: # Login password.
      proxy.port:  # Proxy port the HTTP client should use.
      proxy.host:  # Proxy host the HTTP client should use.
    elasticsearch:
      cluster-name: jkzl #es集群的名字
      cluster-nodes: 172.19.103.68:9300  #多个逗号分割
      cluster-nodes-jest: http://172.19.103.68:9200  #多个逗号分割
      repositories:
        enabled: true
      properties:
        client:
          transport:
            sniff: false #开启嗅探集群  用nginx代理一层过后会出现ip解析失败问题