Progr1mmer před 6 roky
rodič
revize
fb104b763b

+ 8 - 8
elasticsearch-jest-starter/pom.xml

@ -16,21 +16,21 @@
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
        </dependency>
        <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
    </dependencies>
</project>

+ 49 - 0
elasticsearch-starter/pom.xml

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>jkzl-starter</artifactId>
        <groupId>com.yihu</groupId>
        <version>2.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>elasticsearch-starter</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
        </dependency>
        <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
        </dependency>
        <dependency>
            <groupId>com.sun.jna</groupId>
            <artifactId>jna</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
    </dependencies>
</project>

+ 310 - 0
elasticsearch-starter/src/main/java/com/yihu/elasticsearch/ElasticSearchHelper.java

@ -0,0 +1,310 @@
package com.yihu.elasticsearch;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.parser.SQLExprParser;
import com.alibaba.fastjson.JSONObject;
import com.yihu.elasticsearch.model.ESIDEntity;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.nlpcn.es4sql.domain.Select;
import org.nlpcn.es4sql.jdbc.ObjectResult;
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
import org.nlpcn.es4sql.parse.SqlParser;
import org.nlpcn.es4sql.query.AggregationQueryAction;
import org.nlpcn.es4sql.query.DefaultQueryAction;
import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.*;
/**
 * Utils - 主要基于(jestClient)的搜索工具
 * Created by chenweida on 2017/6/2.
 */
@Component
public class ElasticSearchHelper {
    private Logger logger = LoggerFactory.getLogger(ElasticSearchHelper.class);
    @Autowired
    private ElasticSearchPool elasticSearchPool;
    @Autowired
    private JestClient jestClient;
    /**
     * 新增
     *
     * @param index
     * @param type
     * @param sources
     * @return
     */
    public <T> Boolean save (String index, String type, List<T> sources) throws IOException {
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
        sources.forEach(item -> {
            Index indexObj = new Index.Builder(item).build();
            bulk.addAction(indexObj);
        });
        BulkResult br = jestClient.execute(bulk.build());
        logger.debug("save flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    public Boolean save (String index, String type, String source) throws IOException {
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
        Index indexObj = new Index.Builder(source).build();
        bulk.addAction(indexObj);
        BulkResult br = jestClient.execute(bulk.build());
        logger.debug("save flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    /**
     * 自定义ID
     *
     * @param index
     * @param type
     * @param source
     * @param idFieldString
     * @return
     */
    public Boolean saveWithCustomId (String index, String type, String source, String idFieldString) throws IOException {
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
        JSONObject jsonObject = (JSONObject) (JSONObject.parse(source));
        Index indexObj = new Index.Builder(source).id(jsonObject.getString(idFieldString)).
                build();
        bulk.addAction(indexObj);
        BulkResult br = jestClient.execute(bulk.build());
        logger.debug("save flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    /**
     * 自定义ID
     *
     * @param index
     * @param type
     * @param sources
     * @param idFieldString
     * @return
     */
    public Boolean saveBulkWithCustomId (String index, String type, List<String> sources, String idFieldString) throws IOException {
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
        sources.forEach(item -> {
            JSONObject jsonObject = (JSONObject) (JSONObject.parse(item));
            Index indexObj = new Index.Builder(item).id(jsonObject.getString(idFieldString)).
                    build();
            bulk.addAction(indexObj);
        });
        BulkResult br = jestClient.execute(bulk.build());
        logger.debug("save flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    /**
     * 更新
     *
     * @param index
     * @param type
     * @param sources
     * @return
     */
    public <T> Boolean update (String index, String type, List<T> sources) throws IOException {
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
        sources.forEach(item -> {
            JSONObject jo = new JSONObject();
            jo.put("doc", item);
            Update indexObj = new Update.Builder(jo.toString()).index(index).type(type).id(((ESIDEntity) item).getId()).build();
            bulk.addAction(indexObj);
        });
        BulkResult br = jestClient.execute(bulk.build());
        logger.debug("update flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    /**
     * 更新
     * @param index
     * @param type
     * @param list
     * @return
     */
    public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        Iterator var10 = list.iterator();
        while (var10.hasNext()) {
            Map map = (Map)var10.next();
            JSONObject jo = new JSONObject();
            jo.put("doc", map);
            Update indexObj = (((new Update.Builder(jo.toString())).index(index)).type(type)).id(String.valueOf(map.get("id"))).build();
            bulk.addAction(indexObj);
        }
        BulkResult br = jestClient.execute(bulk.build());
        logger.debug("update flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    /**
     * 更新
     *
     * @param index
     * @param type
     * @param _id
     * @param source
     * @return
     */
    public boolean update(String index, String type, String _id, JSONObject source) throws IOException {
        JSONObject docSource = new JSONObject();
        docSource.put("doc", source);
        Update update = new Update.Builder(docSource).index(index).type(type).id(_id).build();
        JestResult jestResult = jestClient.execute(update);
        logger.debug("update info: " + jestResult.isSucceeded());
        return true;
    }
    /**
     * 删除
     */
    public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
        //根据id批量删除
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
        for (Map map : datas) {
            if (!map.containsKey("id") || !map.containsKey("_id")) {
                continue;
            }
            Delete indexObj = null;
            if (null != map.get("_id")) {
                indexObj = new Delete.Builder(map.get("_id").toString()).build();
            } else if (null != map.get("id")) {
                indexObj = new Delete.Builder(map.get("id").toString()).build();
            }
            bulk.addAction(indexObj);
        }
        BulkResult br = jestClient.execute(bulk.build());
        logger.debug("delete data count: " + datas.size());
        logger.debug("delete flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    public SearchResult search(String index, String type, String queryStr) throws IOException {
        Search search = ((new Search.Builder(queryStr)).addIndex(index)).addType(type).build();
        SearchResult result = jestClient.execute(search);
        this.logger.info("search data count: " + result.getTotal());
        return result;
    }
    /**
     * 执行sql
     *
     * @param sql
     * @return
     */
    public List<Map<String, Object>> executeSQL (String sql) throws Exception {
        List<Map<String, Object>> returnModels = new ArrayList<>();
        SQLExprParser parser = new ElasticSqlExprParser(sql);
        SQLExpr expr = parser.expr();
        SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
        /*if (parser.getLexer().token() != Token.EOF) {
            throw new ParserException("illegal sql expr : " + sql);
        }*/
        Select select = new SqlParser().parseSelect(queryExpr);
        //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
        AggregationQueryAction action = null;
        DefaultQueryAction queryAction = null;
        SqlElasticSearchRequestBuilder requestBuilder = null;
        if (select.isAgg) {
            //包含计算的的排序分组的
            action = new AggregationQueryAction(elasticSearchPool.getClient(), select);
            requestBuilder = action.explain();
        } else {
            //封装成自己的Select对象
            Client client = elasticSearchPool.getClient();
            queryAction = new DefaultQueryAction(client, select);
            requestBuilder = queryAction.explain();
        }
        SearchResponse response = (SearchResponse) requestBuilder.get();
        Object queryResult = null;
        if (sql.toUpperCase().indexOf("GROUP") != -1 || sql.toUpperCase().indexOf("SUM") != -1|| sql.toUpperCase().indexOf("COUNT(") != -1) {
            queryResult = response.getAggregations();
        } else {
            queryResult = response.getHits();
        }
        ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
        List<String> heads = temp.getHeaders();
        temp.getLines().stream().forEach(one -> {
            Map<String, Object> oneMap = new HashMap<String, Object>();
            for (int i = 0; i < one.size(); i++) {
                Object value = one.get(i);
                /*if(value instanceof Double){
                    Double valueTemp =Double.valueOf((Double)value);
                    DecimalFormat df = new DecimalFormat("######0");
                    value=df.format(valueTemp);
                }*/
                String key = heads.get(i);
                oneMap.put(key, value);
            }
            returnModels.add(oneMap);
        });
        return returnModels;
    }
    /**
     * 执行sql
     * @param sql
     * @return
     */
    public Integer executeCountSQL(String sql) throws Exception {
        SQLExprParser parser = new ElasticSqlExprParser(sql);
        SQLExpr expr = parser.expr();
        SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
        /*if (parser.getLexer().token() != Token.EOF) {
            throw new ParserException("illegal sql expr : " + sql);
        }*/
        Select select = new SqlParser().parseSelect(queryExpr);
        //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
        SqlElasticSearchRequestBuilder requestBuilder;
        if (select.isAgg) {
            //包含计算的的排序分组的
            AggregationQueryAction action = new AggregationQueryAction(elasticSearchPool.getClient(), select);
            requestBuilder = action.explain();
        } else {
            //封装成自己的Select对象
            Client client = elasticSearchPool.getClient();
            DefaultQueryAction queryAction = new DefaultQueryAction(client, select);
            requestBuilder = queryAction.explain();
        }
        SearchResponse response = (SearchResponse) requestBuilder.get();
        Object queryResult;
        if (sql.toUpperCase().indexOf("GROUP") != -1 || sql.toUpperCase().indexOf("SUM") != -1|| sql.toUpperCase().indexOf("COUNT(") != -1) {
            queryResult = response.getAggregations();
        } else {
            queryResult = response.getHits();
        }
        ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
        for (int j = 0; j < temp.getLines().size(); j ++){
            List<Object> one = temp.getLines().get(j);
            for (int i = 0; i < one.size(); i++) {
                Object value = one.get(i);
                if (value instanceof Double){
                    Double valueTemp =Double.valueOf((Double)value);
                    DecimalFormat df = new DecimalFormat("######0");
                    return Integer.parseInt(df.format(valueTemp));
                }
            }
        }
        return 0;
    }
}

+ 82 - 0
elasticsearch-starter/src/main/java/com/yihu/elasticsearch/ElasticSearchPool.java

@ -0,0 +1,82 @@
package com.yihu.elasticsearch;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory;
import com.yihu.elasticsearch.config.ElasticSearchConfig;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.net.InetSocketAddress;
import java.util.Properties;
/**
 * Pool - ES TransportClient 连接池
 * Created by progr1mmer on 2018/1/4.
 */
@Component
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class ElasticSearchPool {
    private static volatile TransportClient transportClient;
    @Autowired
    private ElasticSearchConfig elasticSearchConfig;
    private TransportClient getTransportClient() {
        Settings settings = Settings.builder()
                .put("cluster.name", elasticSearchConfig.getClusterName())
                .put("client.transport.sniff", elasticSearchConfig.isClientTransportSniff())
                .build();
        String[] nodeArr = elasticSearchConfig.getClusterNodes().split(",");
        InetSocketTransportAddress[] socketArr = new InetSocketTransportAddress[nodeArr.length];
        for (int i = 0; i < socketArr.length; i++) {
            if (!StringUtils.isEmpty(nodeArr[i])) {
                String[] nodeInfo = nodeArr[i].split(":");
                socketArr[i] = new InetSocketTransportAddress(new InetSocketAddress(nodeInfo[0], new Integer(nodeInfo[1])));
            }
        }
        return TransportClient.builder().settings(settings).build().addTransportAddresses(socketArr);
    }
    /**
     * 1.TransportClient本身支持多线程的数据请求
     * 2.移除多个TransportClient的线程池支持,减少Socket链接
     * 3.基于多重检查的单例模式,兼顾安全和效率
     * 4.为提高效率,使用完毕后请勿进行 transportClient.close() 的关闭操作
     * @return
     */
    public TransportClient getClient() {
        if (transportClient != null) {
            if (transportClient.connectedNodes().isEmpty()) {
                synchronized (TransportClient.class) {
                    if (transportClient.connectedNodes().isEmpty()) {
                        transportClient = getTransportClient();
                    }
                }
            }
            return transportClient;
        }
        synchronized (TransportClient.class) {
            if (null == transportClient) {
                transportClient = getTransportClient();
            }
        }
        return transportClient;
    }
    public DruidDataSource getDruidDataSource() throws Exception {
        Properties properties = new Properties();
        properties.put("url", "jdbc:elasticsearch://" + elasticSearchConfig.getClusterNodes() + "/");
        DruidDataSource druidDataSource = (DruidDataSource) ElasticSearchDruidDataSourceFactory
                .createDataSource(properties);
        druidDataSource.setInitialSize(1);
        return druidDataSource;
    }
}

+ 746 - 0
elasticsearch-starter/src/main/java/com/yihu/elasticsearch/ElasticSearchUtil.java

@ -0,0 +1,746 @@
package com.yihu.elasticsearch;
import com.alibaba.druid.pool.DruidDataSource;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.ParseException;
import java.util.*;
/**
 * Utils - 主要基于(TransportClient)的搜索工具
 * Created by progr1mmer on 2017/12/2.
 */
@Service
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class ElasticSearchUtil {
    @Autowired
    private ElasticSearchPool elasticSearchPool;
    /**
     * 创建映射
     *  注意:保存数据之前如果没有创建相应的字
     *  段映射会导致搜索结果不准确
     * @param index
     * @param type
     * @param source
     * @param setting - 该设置根据需要进行配置
     * @throws IOException
     */
    public void mapping (String index, String type, Map<String, Map<String, String>> source, Map<String, Object> setting) throws IOException{
        TransportClient transportClient = elasticSearchPool.getClient();
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties");
        for (String field : source.keySet()) {
            xContentBuilder.startObject(field);
            Map<String, String> propsMap = source.get(field);
            for (String prop : propsMap.keySet()) {
                xContentBuilder.field(prop, propsMap.get(prop));
            }
            xContentBuilder.endObject();
        }
        xContentBuilder.endObject().endObject();
        CreateIndexRequestBuilder createIndexRequestBuilder = transportClient.admin().indices().prepareCreate(index);
        createIndexRequestBuilder.addMapping(type, xContentBuilder);
        /*Map<String, Object> settingSource = new HashMap<>();
        settingSource.put("index.translog.flush_threshold_size", "1g"); //log文件大小
        settingSource.put("index.translog.flush_threshold_ops", "100000"); //flush触发次数
        settingSource.put("index.translog.durability", "async"); //异步更新
        settingSource.put("index.refresh_interval", "30s"); //刷新间隔
        settingSource.put("index.number_of_replicas", 1); //副本数
        settingSource.put("index.number_of_shards", 3); //分片数
        createIndexRequestBuilder.setSettings(settingSource);*/
        if (setting != null && !setting.isEmpty()) {
            createIndexRequestBuilder.setSettings(setting);
        }
        createIndexRequestBuilder.get();
    }
    /**
     * 移除索引 - 整个移除
     * @param index
     */
    public void remove (String index){
        TransportClient transportClient = elasticSearchPool.getClient();
        DeleteIndexRequestBuilder deleteIndexRequestBuilder = transportClient.admin().indices().prepareDelete(index);
        deleteIndexRequestBuilder.get();
    }
    /**
     * 添加数据
     * @param index
     * @param type
     * @param source
     * @return
     * @throws ParseException
     */
    public Map<String, Object> index (String index, String type, Map<String, Object> source) throws ParseException{
        TransportClient transportClient = elasticSearchPool.getClient();
        String _id = (String) source.remove("_id");
        if (StringUtils.isEmpty(_id)) {
            IndexResponse response = transportClient.prepareIndex(index, type).setSource(source).get();
            source.put("_id", response.getId());
        } else {
            IndexResponse response = transportClient.prepareIndex(index, type, _id).setSource(source).get();
            source.put("_id", response.getId());
        }
        return source;
    }
    /**
     * 批量添加数据 - 效率高
     * @param index
     * @param type
     * @param source
     * @throws ParseException
     */
    public void bulkIndex (String index, String type, List<Map<String, Object>> source) throws ParseException{
        if (source.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            source.forEach(item -> {
                String _id = (String) item.remove("_id");
                if (StringUtils.isEmpty(_id)) {
                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type).setSource(item));
                } else {
                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type, _id).setSource(item));
                }
            });
            bulkRequestBuilder.get();
        }
    }
    /**
     * 删除数据
     * @param index
     * @param type
     * @param id
     */
    public void delete (String index, String type, String id) {
        TransportClient transportClient = elasticSearchPool.getClient();
        transportClient.prepareDelete(index, type, id).get();
    }
    /**
     * 批量删除数据
     * @param index
     * @param type
     * @param idArr
     */
    public void bulkDelete (String index, String type, String [] idArr) {
        if (idArr.length > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            for (String id : idArr) {
                bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
            }
            bulkRequestBuilder.get();
        }
    }
    /**
     * 根据字段批量删除数据
     * @param index
     * @param type
     * @param field
     * @param value
     */
    public void deleteByField(String index, String type, String field, Object value) {
        deleteByFilter(index, type, field + "=" + value);
    }
    /**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param filters
     */
    public void deleteByFilter(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        deleteByFilter(index, type, queryBuilder);
    }
    /**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param queryBuilder
     */
    public void deleteByFilter(String index, String type, QueryBuilder queryBuilder) {
        List<String> idList = getIds(index, type, queryBuilder);
        if (idList.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            String [] idArr = new String[idList.size()];
            idArr = idList.toArray(idArr);
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            for (String id : idArr) {
                bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
            }
            bulkRequestBuilder.get();
        }
    }
    /**
     * 更新数据 - 返回最新文档
     * @param index
     * @param type
     * @param id
     * @param source
     * @return
     * @throws DocumentMissingException
     */
    public Map<String, Object> update(String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
        TransportClient transportClient = elasticSearchPool.getClient();
        source.remove("_id");
        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
        return findById(index, type, id);
    }
    /**
     * 更新数据 - 不返回文档
     * @param index
     * @param type
     * @param id
     * @param source
     * @throws DocumentMissingException
     */
    public void voidUpdate (String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
        TransportClient transportClient = elasticSearchPool.getClient();
        source.remove("_id");
        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
    }
    /**
     * 批量更新数据
     * @param index
     * @param type
     * @param source
     * @throws DocumentMissingException
     */
    public void bulkUpdate(String index, String type, List<Map<String, Object>> source) throws DocumentMissingException {
        if (source.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            source.forEach(item -> {
                String _id = (String)item.remove("_id");
                if (!StringUtils.isEmpty(_id)) {
                    bulkRequestBuilder.add(transportClient.prepareUpdate(index, type, _id).setDoc(item).setRetryOnConflict(5));
                }
            });
            bulkRequestBuilder.get();
        }
    }
    /**
     * 根据ID查找数据
     * @param index
     * @param type
     * @param id
     * @return
     */
    public Map<String, Object> findById(String index, String type, String id) {
        TransportClient transportClient = elasticSearchPool.getClient();
        GetRequest getRequest = new GetRequest(index, type, id);
        GetResponse response = transportClient.get(getRequest).actionGet();
        Map<String, Object> source = response.getSource();
        if (source != null) {
            source.put("_id", response.getId());
        }
        return source;
    }
    /**
     * 根据字段查找数据
     * @param index
     * @param type
     * @param field
     * @param value
     * @return
     */
    public List<Map<String, Object>> findByField(String index, String type, String field, Object value) {
        return list(index, type, field + "=" + value);
    }
    /**
     * 获取文档列表
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public List<Map<String, Object>> list(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return list(index, type, queryBuilder);
    }
    /**
     * 获取文档列表
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public List<Map<String, Object>> list(String index, String type, QueryBuilder queryBuilder) {
        int size = (int)count(index, type, queryBuilder);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSource();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        return resultList;
    }
    /**
     * 获取文档分页
     * @param index
     * @param type
     * @param filters
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, int page, int size) {
        return page(index, type, filters, null, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param filters
     * @param sorts
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, String sorts, int page, int size) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        List<SortBuilder> sortBuilders = getSortBuilder(sorts);
        return page(index, type, queryBuilder, sortBuilders, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param queryBuilder
     * @param sortBuilders
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, int page, int size) {
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, sortBuilders, (page - 1) * size, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSource();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        return new PageImpl<>(resultList, new PageRequest(page - 1, size), hits.totalHits());
    }
    /**
     * 获取ID列表
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public List<String> getIds (String index, String type, String filters){
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return getIds(index, type, queryBuilder);
    }
    /**
     * 获取ID列表
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public List<String> getIds (String index, String type, QueryBuilder queryBuilder) {
        int size = (int)count(index, type, queryBuilder);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<String> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            resultList.add(hit.getId());
        }
        return resultList;
    }
    /**
     * 获取文档数
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public long count(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return count(index, type, queryBuilder);
    }
    /**
     * 获取文档数
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public long count(String index, String type, QueryBuilder queryBuilder) {
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        return builder.get().getHits().totalHits();
    }
    /**
     * 根据SQL查找数据
     * @param field
     * @param sql
     * @return
     * @throws Exception
     */
    public List<Map<String, Object>> findBySql(List<String> field, String sql) throws Exception {
        List<Map<String, Object>> list = new ArrayList<>();
        DruidDataSource druidDataSource = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            druidDataSource = elasticSearchPool.getDruidDataSource();
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                Map<String, Object> rowData = new HashMap<>();
                for (String _field : field) {
                    rowData.put(_field, resultSet.getObject(_field));
                }
                list.add(rowData);
            }
            return list;
        } catch (Exception e) {
            if (!"Error".equals(e.getMessage())){
                e.printStackTrace();
            }
            return new ArrayList<>();
        } finally {
            if (resultSet != null) {
                resultSet.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
            if (druidDataSource != null) {
                druidDataSource.close();
            }
        }
    }
    /**
     * 根据SQL查找数据
     * @param sql
     * @return
     * @throws Exception
     */
    public ResultSet findBySql(String sql) throws Exception {
        DruidDataSource druidDataSource = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            druidDataSource = elasticSearchPool.getDruidDataSource();
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            resultSet = preparedStatement.executeQuery();
            return resultSet;
        } finally {
            if (resultSet != null) {
                resultSet.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
            if (druidDataSource != null) {
                druidDataSource.close();
            }
        }
    }
    /**
     * 根据日期分组
     * @param index
     * @param type
     * @param filters
     * @param start
     * @param end
     * @param field
     * @param interval
     * @param format
     * @return
     */
    public Map<String, Long> dateHistogram(String index, String type, String filters, Date start, Date end, String field, DateHistogramInterval interval, String format) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
        DateHistogramBuilder dateHistogramBuilder = new DateHistogramBuilder(index + "-" + field);
        dateHistogramBuilder.field(field);
        dateHistogramBuilder.interval(interval);
        if (!StringUtils.isEmpty(format)) {
            dateHistogramBuilder.format(format);
        }
        dateHistogramBuilder.minDocCount(0);
        dateHistogramBuilder.extendedBounds(start.getTime(), end.getTime());
        builder.addAggregation(dateHistogramBuilder);
        SearchResponse response = builder.get();
        Histogram histogram = response.getAggregations().get(index + "-" + field);
        Map<String, Long> temp = new HashMap<>();
        histogram.getBuckets().forEach(item -> temp.put(item.getKeyAsString(), item.getDocCount()));
        return temp;
    }
    /**
     * 查询去重数量
     * @param index
     * @param type
     * @param filters
     * @param filed
     * @return
     */
    public int cardinality(String index, String type, String filters, String filed){
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
        CardinalityBuilder cardinality = AggregationBuilders.cardinality("cardinality").field(filed);
        builder.addAggregation(cardinality);
        SearchResponse response = builder.get();
        InternalCardinality internalCard = response.getAggregations().get("cardinality");
        return new Double(internalCard.getProperty("value").toString()).intValue();
    }
    /**
     * 分组统计
     * @param index
     * @param type
     * @param filters
     * @param groupField
     * @return
     */
    public Map<String, Long> countByGroup(String index, String type, String filters, String groupField) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        AbstractAggregationBuilder aggregation = AggregationBuilders.terms("count").field(groupField);
        builder.addAggregation(aggregation);
        SearchResponse response = builder.get();
        Terms terms = response.getAggregations().get("count");
        List<Terms.Bucket> buckets = terms.getBuckets();
        Map<String, Long> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets) {
            //System.out.println(bucket.getKey()+"----"+bucket.getDocCount());
            groupMap.put(bucket.getKey().toString(), bucket.getDocCount());
        }
        return groupMap;
    }
    /**
     * 分组求和
     * @param index
     * @param type
     * @param filters
     * @param sumField
     * @param groupField
     * @return
     */
    public Map<String, Double> sumByGroup(String index, String type, String filters, String sumField, String groupField) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        TermsBuilder aggregation = AggregationBuilders.terms("sum_query").field(groupField);
        SumBuilder sumBuilder= AggregationBuilders.sum("sum_row").field(sumField);
        aggregation.subAggregation(sumBuilder);
        builder.addAggregation(aggregation);
        SearchResponse response = builder.get();
        Terms terms = response.getAggregations().get("sum_query");
        List<Terms.Bucket> buckets = terms.getBuckets();
        Map<String, Double> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets){
            Sum sum2 = bucket.getAggregations().get("sum_row");
            groupMap.put(bucket.getKey().toString(), sum2.getValue());
        }
        return groupMap;
    }
    /**
     * 获取基础请求生成器
     * @param index
     * @param type
     * @param queryBuilder
     * @param sortBuilders
     * @return
     */
    public SearchRequestBuilder searchRequestBuilder(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, Integer from, Integer size) {
        TransportClient transportClient = elasticSearchPool.getClient();
        SearchRequestBuilder builder = transportClient.prepareSearch(index);
        builder.setTypes(type);
        builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
        builder.setQuery(queryBuilder);
        builder.setExplain(true);
        if (sortBuilders != null) {
            sortBuilders.forEach(item -> builder.addSort(item));
        }
        if (from != null) {
            builder.setFrom(from);
        }
        if (size != null) {
            builder.setSize(size);
        }
        return builder;
    }
    /**
     * 排序语句转换
     * @param sorts
     * @return
     */
    public List<SortBuilder> getSortBuilder(String sorts) {
        List<SortBuilder> sortBuilderList = new ArrayList<>();
        if (StringUtils.isEmpty(sorts)) {
            return sortBuilderList;
        }
        String [] sortArr = sorts.split(";");
        for (String sort : sortArr) {
            String operator = sort.substring(0, 1);
            SortBuilder sortBuilder = new FieldSortBuilder(sort.substring(1));
            if ("-".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.DESC);
            } else if ("+".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.ASC);
            } else {
                sortBuilder.order(SortOrder.DESC);
            }
            sortBuilderList.add(sortBuilder);
        }
        return sortBuilderList;
    }
    /**
     * 查询语句转换
     * @param filters
     * @return
     */
    public QueryBuilder getQueryBuilder(String filters) {
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (StringUtils.isEmpty(filters)) {
            return boolQueryBuilder;
        }
        String [] filterArr = filters.split(";");
        for (String filter : filterArr) {
            if (filter.contains("||")){
                String [] fields = filter.split("\\|\\|");
                BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                for (String filed : fields) {
                    String [] condition = filed.split("=");
                    queryBuilder.should(QueryBuilders.termQuery(condition[0], condition[1]));
                }
                boolQueryBuilder.must(queryBuilder);
            } else if (filter.contains("?")) {
                String [] condition = filter.split("\\?");
                MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchPhraseQuery(condition[0], condition[1]);
                boolQueryBuilder.must(matchQueryBuilder);
            } else if (filter.contains("<>")) {
                String [] condition = filter.split("<>");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.mustNot(termsQueryBuilder);
                } else {
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.mustNot(termQueryBuilder);
                }
            } else if (filter.contains(">=")) {
                String [] condition = filter.split(">=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains(">")) {
                String [] condition = filter.split(">");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<=")) {
                String [] condition = filter.split("<=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<")) {
                String [] condition = filter.split("<");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("=")) {
                String [] condition = filter.split("=");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.must(termsQueryBuilder);
                } else {
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.must(termQueryBuilder);
                }
            }
        }
        return boolQueryBuilder;
    }
}

+ 56 - 0
elasticsearch-starter/src/main/java/com/yihu/elasticsearch/config/ElasticSearchConfig.java

@ -0,0 +1,56 @@
package com.yihu.elasticsearch.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
 * Config - ES TransportClient 配置
 * Created by progr1mmer on 2017/12/1.
 */
@ConfigurationProperties(prefix = "spring.elasticsearch")
@Configuration
public class ElasticSearchConfig {
    // 集群名称
    private String clusterName;
    // 节点
    private String clusterNodes;
    // 是否开启嗅探
    private boolean clientTransportSniff;
    public String getClusterName() {
        return clusterName;
    }
    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }
    public String getClusterNodes() {
        return clusterNodes;
    }
    public void setClusterNodes(String clusterNodes) {
        this.clusterNodes = clusterNodes;
    }
    public boolean isClientTransportSniff() {
        return clientTransportSniff;
    }
    public void setClientTransportSniff(boolean clientTransportSniff) {
        this.clientTransportSniff = clientTransportSniff;
    }
    @PostConstruct
    private void configInfo() {
        StringBuilder info = new StringBuilder("{");
        info.append("\n  elasticsearch.cluster-name = " + clusterName);
        info.append("\n  elasticsearch.cluster-nodes = " + clusterNodes);
        info.append("\n  elasticsearch.cluster-nodes = " + clientTransportSniff);
        info.append("\n}");
        System.out.println("Elasticsearch.configInfo : " + info.toString());
    }
}

+ 21 - 0
elasticsearch-starter/src/main/java/com/yihu/elasticsearch/model/ESIDEntity.java

@ -0,0 +1,21 @@
package com.yihu.elasticsearch.model;
import io.searchbox.annotations.JestId;
/**
 * Model - es保存model的公共父类
 * Created by chenweida on 2017/11/3.
 */
public class ESIDEntity {
    @JestId
    private String id;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}

+ 2 - 2
elasticsearch-transport-starter/src/main/java/com/yihu/elasticsearch/transport/ElasticSearchPool.java

@ -1,8 +1,8 @@
package com.yihu.elasticsearch.transport;
package com.yihu.elasticsearch;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory;
import com.yihu.elasticsearch.transport.config.ElasticSearchConfig;
import com.yihu.elasticsearch.config.ElasticSearchConfig;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

+ 1 - 1
elasticsearch-transport-starter/src/main/java/com/yihu/elasticsearch/transport/ElasticSearchUtil.java

@ -1,4 +1,4 @@
package com.yihu.elasticsearch.transport;
package com.yihu.elasticsearch;
import com.alibaba.druid.pool.DruidDataSource;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;

+ 1 - 1
elasticsearch-transport-starter/src/main/java/com/yihu/elasticsearch/transport/config/ElasticSearchConfig.java

@ -1,4 +1,4 @@
package com.yihu.elasticsearch.transport.config;
package com.yihu.elasticsearch.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

+ 15 - 2
pom.xml

@ -21,11 +21,12 @@
        <module>fastdfs-starter</module>
        <module>swagger-starter</module>
        <module>mysql-starter</module>
        <module>elasticsearch-jest-starter</module>
        <module>hbase-starter</module>
        <module>solr-starter</module>
        <module>elasticsearch-transport-starter</module>
        <module>utils</module>
        <module>elasticsearch-starter</module>
        <module>elasticsearch-transport-starter</module>
        <module>elasticsearch-jest-starter</module>
    </modules>
    <!-- 项目发布到这个服务器 -->
@ -51,7 +52,9 @@
        <version.hibernate>5.0.12.Final</version.hibernate>
        <version.hibernate-validator>6.0.10.Final</version.hibernate-validator>
        <version.hibernate-jpa-api>1.0.0.Final</version.hibernate-jpa-api>
        <version.elasticsearch>2.4.0</version.elasticsearch>
        <version.elasticsearch-sql>2.4.1.0</version.elasticsearch-sql>
        <version.jest>2.4.0</version.jest>
        <verion.fastjson>1.2.17</verion.fastjson>
        <version.druid>1.0.15</version.druid>
        <version.jna>3.0.9</version.jna>
@ -126,11 +129,21 @@
            </dependency>
            <!-- Elasticsearch library -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>${version.elasticsearch}</version>
            </dependency>
            <dependency>
                <groupId>org.nlpcn</groupId>
                <artifactId>elasticsearch-sql</artifactId>
                <version>${version.elasticsearch-sql}</version>
            </dependency>
            <dependency>
                <groupId>io.searchbox</groupId>
                <artifactId>jest</artifactId>
                <version>${version.jest}</version>
            </dependency>
            <!-- Alibaba library -->
            <dependency>