浏览代码

统计服务

wangzhinan 2 年之前
父节点
当前提交
f5ec1d8171
共有 18 个文件被更改,包括 1922 次插入1205 次删除
  1. 11 7
      starter/elasticsearch-starter/pom.xml
  2. 749 7
      starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Util.java
  3. 8 2
      svr/svr-quota/pom.xml
  4. 82 81
      svr/svr-quota/src/main/java/com/yihu/jw/elasticsearch/ElasticSearchPool.java
  5. 780 780
      svr/svr-quota/src/main/java/com/yihu/jw/elasticsearch/ElasticSearchUtil.java
  6. 5 1
      svr/svr-quota/src/main/java/com/yihu/jw/elasticsearch/config/ElasticSearchConfig.java
  7. 65 103
      svr/svr-quota/src/main/java/com/yihu/jw/quota/controller/ElasticSearchController.java
  8. 14 13
      svr/svr-quota/src/main/java/com/yihu/jw/quota/controller/ElasticSearchEndPoint.java
  9. 2 2
      svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/extract/ExtractPercentHelper.java
  10. 2 2
      svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/extract/es/EsExtract.java
  11. 13 18
      svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/extract/es/EsResultExtract.java
  12. 17 23
      svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/save/LargDataWithRunnable.java
  13. 24 20
      svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/save/es/ElastricSearchSave.java
  14. 75 71
      svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/util/ElasticsearchUtil.java
  15. 59 59
      svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/util/EsClientUtil.java
  16. 2 9
      svr/svr-quota/src/main/java/com/yihu/jw/quota/job/EsQuotaJob.java
  17. 2 7
      svr/svr-quota/src/main/java/com/yihu/jw/quota/job/EsQuotaPercentJob.java
  18. 12 0
      svr/svr-quota/src/main/resources/application.yml

+ 11 - 7
starter/elasticsearch-starter/pom.xml

@ -21,25 +21,29 @@
            <artifactId>spring-data-commons</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
        </dependency>
<!--        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.6.2</version>
        </dependency>
        <dependency>
        </dependency>-->
<!--        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.6.2</version>
        </dependency>
        <dependency>
        </dependency>-->
<!--        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.6.2</version>
        </dependency>
        <dependency>
        </dependency>-->
<!--        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>7.17.4</version>
        </dependency>
        </dependency>-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>

+ 749 - 7
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Util.java

@ -1,27 +1,57 @@
package com.yihu.jw.elasticsearch;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.alibaba.druid.sql.parser.ParserException;
import com.alibaba.druid.sql.parser.SQLExprParser;
import com.alibaba.druid.sql.parser.Token;
import com.alibaba.fastjson.JSONObject;
import io.searchbox.core.Update;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalCardinality;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.nlpcn.es4sql.domain.Select;
import org.nlpcn.es4sql.domain.Where;
import org.nlpcn.es4sql.exception.SqlParseException;
@ -30,9 +60,6 @@ import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
import org.nlpcn.es4sql.parse.SqlParser;
import org.nlpcn.es4sql.parse.WhereParser;
import org.nlpcn.es4sql.query.AggregationQueryAction;
import org.nlpcn.es4sql.query.DefaultQueryAction;
import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
import org.nlpcn.es4sql.query.maker.QueryMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,10 +67,15 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.DecimalFormat;
import java.util.*;
@ -415,4 +447,714 @@ public class ElasticSearch7Util {
        }
        return list;
    }
    /**
     * @param boolQueryBuilder  查询参数 build
     * @param sortName 排序字段名称
     * @return
     */
    public List<Map<String, Object>> queryList(String index,BoolQueryBuilder boolQueryBuilder,String sortName,Integer size) throws IOException {
        //实例化查询请求对象
        SearchRequest request = new SearchRequest();
        //实例化SearchSourceBuilder
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        //将查询构造器注入SearchSourceBuilder
        if (StringUtils.isNoneBlank(sortName)){
            searchBuilder.query(boolQueryBuilder).sort(sortName, SortOrder.DESC);
        }
        if (size!=null){
            searchBuilder.size(size);
        }
        //设置请求查询的索引(查询构造器中已指定,无需重复设置)
        //request.indices(indexName);
        //将构建好的SearchSourceBuilder注入请求
        request.source(searchBuilder);
        request.setPreFilterShardSize(size);
        //带入请求执行查询
        SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        //得到查询结果
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        List<Map<String,Object>> listData = new ArrayList<>();
        //遍历查询结果
        for(SearchHit hit : searchHits){
            Map<String,Object> datas = hit.getSourceAsMap();
            listData.add(datas);
            logger.info(datas.toString());
        }
        return listData;
        /* SearchResponse actionGet = null;
        SortBuilder dealSorter = null;
        if(sortName != null){
            dealSorter = SortBuilders.fieldSort(sortName).order(SortOrder.DESC);
        }else{
            dealSorter = SortBuilders.fieldSort("_id").order(SortOrder.DESC);
        }
        actionGet = client.prepareSearch(index)
                .setTypes(type)
                .setSize(size)
                .setQuery(boolQueryBuilder)
                .addSort(dealSorter)
                .execute().actionGet();
        SearchHits hits = actionGet.getHits();
        List<Map<String, Object>> matchRsult = new LinkedList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()){
            Map<String, Object> map = new HashMap<>() ;
            map = hit.getSource();
            map.put("id",hit.getId());
            matchRsult.add(map);
        }
        return matchRsult;*/
    }
        /**
     * 创建映射
     *  注意:保存数据之前如果没有创建相应的字
     *  段映射会导致搜索结果不准确
     * @param index
     * @param source
     * @param setting - 该设置根据需要进行配置
     * @throws IOException
     */
    public void mapping (String index, Map<String, Map<String, String>> source, Map<String, Object> setting) throws IOException{
        RestHighLevelClient restHighLevelClient1 = elasticSearch7Pool.restHighLevelClient();
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties");
        for (String field : source.keySet()) {
            xContentBuilder.startObject(field);
            Map<String, String> propsMap = source.get(field);
            for (String prop : propsMap.keySet()) {
                xContentBuilder.field(prop, propsMap.get(prop));
            }
            xContentBuilder.endObject();
        }
        xContentBuilder.endObject().endObject();
        CreateIndexRequest request = new CreateIndexRequest(index);
        request.mapping(xContentBuilder);
        /*Map<String, Object> settingSource = new HashMap<>();
        settingSource.put("index.translog.flush_threshold_size", "1g"); //log文件大小
        settingSource.put("index.translog.flush_threshold_ops", "100000"); //flush触发次数
        settingSource.put("index.translog.durability", "async"); //异步更新
        settingSource.put("index.refresh_interval", "30s"); //刷新间隔
        settingSource.put("index.number_of_replicas", 1); //副本数
        settingSource.put("index.number_of_shards", 3); //分片数
        createIndexRequestBuilder.setSettings(settingSource);*/
        if (setting != null && !setting.isEmpty()) {
            request.settings(setting);
        }
        restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
    }
        /**
     * 添加数据
     * @param index
     * @param source
     * @return
     * @throws IOException
     */
    public Map<String, Object> index(String index, Map<String, Object> source) throws IOException {
        String _id = (String) source.remove("_id");
        if (StringUtils.isEmpty(_id)) {
            IndexRequest request = new IndexRequest(index);
            request.source(source);
            IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);;
            source.put("_id", response.getId());
        } else {
            UpdateRequest request = new UpdateRequest();
            request.index(index).id(_id);
            request.doc(JSONObject.toJSONString(source), XContentType.JSON);
            UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
            source.put("_id", response.getId());
        }
        return source;
    }
        /**
     * 批量删除数据
     * @param index
     * @param idArr
     */
    public void bulkDelete (String index, String [] idArr) throws IOException {
        if (idArr.length > 0) {
            BulkRequest bulkRequest = new BulkRequest();
            for (String id : idArr) {
                DeleteRequest request = new DeleteRequest(index);
                request.id(id);
                bulkRequest.add(request);
            }
            BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            // 4、处理响应结果
            System.out.println("批量删除是否失败:" + response.hasFailures());
            // 4.1、删除详细信息
            for (BulkItemResponse itemResponse : response) {
                BulkItemResponse.Failure failure = itemResponse.getFailure();
                if (failure == null) {
                    System.out.println("删除成功的文档id:" + itemResponse.getId());
                } else {
                    System.out.println("删除失败的文档id:" + itemResponse.getId());
                }
            }
        }
    }
        /**
     * 根据条件批量删除数据
     * @param index
     * @param queryBuilder
     */
    public void deleteByFilter(String index,  QueryBuilder queryBuilder) throws IOException {
        long count = count(index, queryBuilder);
        long page = count/10000 == 0 ? 1 :count/10000 +1;
        for (long i =0;i<page;i++) {
            List<String> idList = getIds(index, queryBuilder);
            if (idList.size() > 0) {
                String[] idArr = new String[idList.size()];
                idArr = idList.toArray(idArr);
                BulkRequest bulkRequest = new BulkRequest();
                for (String id : idArr) {
                    DeleteRequest request = new DeleteRequest(index);
                    request.id(id);
                    bulkRequest.add(request);
                }
                BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                // 4、处理响应结果
                System.out.println("批量删除是否失败:" + response.hasFailures());
                // 4.1、删除详细信息
                for (BulkItemResponse itemResponse : response) {
                    BulkItemResponse.Failure failure = itemResponse.getFailure();
                    if (failure == null) {
                        System.out.println("删除成功的文档id:" + itemResponse.getId());
                    } else {
                        System.out.println("删除失败的文档id:" + itemResponse.getId());
                    }
                }
            }
        }
    }
    /**
     * 根据字段批量删除数据
     * @param index
     * @param type
     * @param field
     * @param value
     */
    public void deleteByField(String index, String type, String field, Object value) throws IOException {
        deleteByFilter(index, type, field + "=" + value);
    }
    /**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param filters
     */
    public void deleteByFilter(String index, String type, String filters) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        deleteByFilter(index, queryBuilder);
    }
   /* *//**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param queryBuilder
     *//*
    public void deleteByFilter(String index, String type, QueryBuilder queryBuilder) throws IOException {
        long count = count(index, type, queryBuilder);
        long page = count/10000 == 0 ? 1 :count/10000 +1;
        for (long i =0;i<page;i++) {
            List<String> idList = getIds(index, type, queryBuilder);
            if (idList.size() > 0) {
                String[] idArr = new String[idList.size()];
                idArr = idList.toArray(idArr);
                BulkRequest bulkRequest = new BulkRequest();
                for (String id : idArr) {
                    DeleteRequest request = new DeleteRequest(index);
                    request.id(id);
                    bulkRequest.add(request);
                }
                BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                // 4、处理响应结果
                System.out.println("批量删除是否失败:" + response.hasFailures());
                // 4.1、删除详细信息
                for (BulkItemResponse itemResponse : response) {
                    BulkItemResponse.Failure failure = itemResponse.getFailure();
                    if (failure == null) {
                        System.out.println("删除成功的文档id:" + itemResponse.getId());
                    } else {
                        System.out.println("删除失败的文档id:" + itemResponse.getId());
                    }
                }
            }
        }
    }*/
    /**
     * 更新数据 - 返回最新文档
     * @param index
     * @param id
     * @param source
     * @return
     * @throws DocumentMissingException
     */
    public Map<String, Object> update(String index,  String id, Map<String, Object> source) throws DocumentMissingException, IOException {
        source.remove("_id");
        UpdateRequest request = new UpdateRequest();
        request.index(index).id("_id");
        request.doc(JSONObject.toJSONString(source), XContentType.JSON);
        request.retryOnConflict(5);
        UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        return findById(index,  id);
    }
    /**
     * 更新数据 - 不返回文档
     * @param index
     * @param id
     * @param source
     * @throws DocumentMissingException
     */
    public void voidUpdate (String index, String id, Map<String, Object> source) throws IOException {
        source.remove("_id");
        UpdateRequest request = new UpdateRequest();
        request.index(index);
        request.id(id);
        request.doc(source);
        request.retryOnConflict(5);
        UpdateResponse response = restHighLevelClient.update(request,RequestOptions.DEFAULT);
    }
    /**
     * 批量更新数据
     * @param index
     * @param source
     * @throws DocumentMissingException
     */
    public void bulkUpdate(String index, List<Map<String, Object>> source) throws DocumentMissingException, IOException {
        if (source.size() > 0) {
            BulkRequest bulkRequest = new BulkRequest();
            source.forEach(item -> {
                String _id = (String)item.remove("_id");
                if (!org.springframework.util.StringUtils.isEmpty(_id)) {
                    UpdateRequest request = new UpdateRequest();
                    request.index(index);
                    request.id(_id);
                    bulkRequest.add(request);
                }
            });
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
        }
    }
    /**
     * 根据ID查找数据
     * @param index
     * @param id
     * @return
     */
    public Map<String, Object> findById(String index, String id) throws IOException {
        GetRequest request = new GetRequest(index);
        request.id();
        GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
        Map<String, Object> source = response.getSource();
        if (source != null) {
            source.put("_id", response.getId());
        }
        return source;
    }
    /**
     * 根据字段查找数据
     * @param index
     * @param field
     * @param value
     * @return
     */
    public List<Map<String, Object>> findByField(String index, String field, Object value) throws IOException {
        return list(index, field + "=" + value);
    }
    /**
     * 获取文档列表
     * @param index
     * @param filters
     * @return
     */
    public List<Map<String, Object>> list(String index,  String filters) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return list(index, queryBuilder);
    }
    /**
     * 获取文档列表
     * @param index
     * @param queryBuilder
     * @return
     */
    public List<Map<String, Object>> list(String index,  QueryBuilder queryBuilder) throws IOException {
        int size = (int)count(index, queryBuilder);
        SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, 0, size);
        SearchRequest request = new SearchRequest(index);
        request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSourceAsMap();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        return resultList;
    }
    /**
     * 获取文档分页
     * @param index
     * @param type
     * @param filters
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, int page, int size) throws IOException {
        return page(index, type, filters, null, page, size);
    }
    /**
     * 获取文档分页
     * @param index
     * @param type
     * @param filters
     * @param sorts
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> pageBySort(String index, String type, String filters, String sorts, int page, int size) throws IOException {
        return page(index, type, filters, sorts, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param filters
     * @param sorts
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, String sorts, int page, int size) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        List<SortBuilder> sortBuilders = getSortBuilder(sorts);
        return page(index, type, queryBuilder, sortBuilders, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param queryBuilder
     * @param sortBuilders
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, int page, int size) throws IOException {
        SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, sortBuilders, (page - 1) * size, size);
        SearchRequest request = new SearchRequest(index);
        request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSourceAsMap();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        PageRequest pageRequest = PageRequest.of(page-1,size);
        return new PageImpl<>(resultList, pageRequest, hits.getTotalHits().value);
    }
    /**
     * 获取ID列表
     * @param index
     * @param filters
     * @return
     */
    public List<String> getIds (String index,  String filters) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return getIds(index,  queryBuilder);
    }
    /**
     * 获取ID列表
     * @param index
     * @param queryBuilder
     * 最多只能一万条
     * @return
     */
    public List<String> getIds (String index, QueryBuilder queryBuilder) throws IOException {
        int size = (int)count(index,queryBuilder);
        size = size > 10000 ? 10000:size;
        SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, 0, size);
        SearchRequest request = new SearchRequest(index);
        request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        List<String> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            resultList.add(hit.getId());
        }
        return resultList;
    }
    /**
     * 获取文档数
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public long count(String index, String type, String filters) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return count(index,queryBuilder);
    }
    /**
     * 获取文档数
     * @param index
     * @param queryBuilder
     * @return
     */
    public long count(String index, QueryBuilder queryBuilder) throws IOException {
        SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, null, null);
        SearchRequest request = new SearchRequest(index);
        request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        return response.getHits().getTotalHits().value;
    }
    /**
     * 根据日期分组
     * @param index
     * @param filters
     * @param start
     * @param end
     * @param field
     * @param interval
     * @param format
     * @return
     */
    public Map<String, Long> dateHistogram(String index, String filters, Date start, Date end, String field, DateHistogramInterval interval, String format) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, 0, 0);
        DateHistogramAggregationBuilder dateHistogramBuilder = new DateHistogramAggregationBuilder(index + "-" + field);
        dateHistogramBuilder.field(field);
        dateHistogramBuilder.fixedInterval(interval);
        if (!org.springframework.util.StringUtils.isEmpty(format)) {
            dateHistogramBuilder.format(format);
        }
        dateHistogramBuilder.minDocCount(0);
        dateHistogramBuilder.extendedBounds(new ExtendedBounds(start.getTime(), end.getTime()));
        builder.aggregation(dateHistogramBuilder);
        SearchRequest request = new SearchRequest(index);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        Histogram histogram = response.getAggregations().get(index + "-" + field);
        Map<String, Long> temp = new HashMap<>();
        histogram.getBuckets().forEach(item -> temp.put(item.getKeyAsString(), item.getDocCount()));
        return temp;
    }
    /**
     * 查询去重数量
     * @param index
     * @param filters
     * @param filed
     * @return
     */
    public int cardinality(String index, String filters, String filed) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, 0, 0);
        CardinalityAggregationBuilder cardinality = AggregationBuilders.cardinality("cardinality").field(filed);
        builder.aggregation(cardinality);
        SearchRequest request = new SearchRequest(index);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        InternalCardinality internalCard = response.getAggregations().get("cardinality");
        return new Double(internalCard.getProperty("value").toString()).intValue();
    }
    /**
     * 分组统计
     * @param index
     * @param filters
     * @param groupField
     * @return
     */
    public Map<String, Long> countByGroup(String index, String filters, String groupField) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, null, null);
        AbstractAggregationBuilder aggregation = AggregationBuilders.terms("count").field(groupField);
        builder.aggregation(aggregation);
        SearchRequest request = new SearchRequest(index);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        Terms terms = response.getAggregations().get("count");
        List<Terms.Bucket> buckets = (List<Terms.Bucket>) terms.getBuckets();
        Map<String, Long> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets) {
            //System.out.println(bucket.getKey()+"----"+bucket.getDocCount());
            groupMap.put(bucket.getKey().toString(), bucket.getDocCount());
        }
        return groupMap;
    }
    /**
     * 分组求和
     * @param index
     * @param filters
     * @param sumField
     * @param groupField
     * @return
     */
    public Map<String, Double> sumByGroup(String index, String filters, String sumField, String groupField) throws IOException {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchSourceBuilder builder = SearchSourceBuilder( queryBuilder, null, null, null);
        TermsAggregationBuilder aggregation = AggregationBuilders.terms("sum_query").field(groupField);
        SumAggregationBuilder sumBuilder= AggregationBuilders.sum("sum_row").field(sumField);
        aggregation.subAggregation(sumBuilder);
        builder.aggregation(aggregation);
        SearchRequest request = new SearchRequest(index);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        Terms terms = response.getAggregations().get("sum_query");
        List<Terms.Bucket> buckets = (List<Terms.Bucket>) terms.getBuckets();
        Map<String, Double> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets){
            Sum sum2 = bucket.getAggregations().get("sum_row");
            groupMap.put(bucket.getKey().toString(), sum2.getValue());
        }
        return groupMap;
    }
    /**
     * 获取基础请求生成器
     * @param queryBuilder
     * @param sortBuilders
     * @return
     */
    public SearchSourceBuilder SearchSourceBuilder(QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, Integer from, Integer size) {
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.query(queryBuilder);
        builder.explain(true);
        if (sortBuilders != null) {
            sortBuilders.forEach(item -> builder.sort(item));
        }
        if (from != null) {
            builder.from(from);
        }
        if (size != null) {
            builder.size(size);
        }
        return builder;
    }
    /**
     * 排序语句转换
     * @param sorts
     * @return
     */
    public List<SortBuilder> getSortBuilder(String sorts) {
        List<SortBuilder> sortBuilderList = new ArrayList<>();
        if (org.springframework.util.StringUtils.isEmpty(sorts)) {
            return sortBuilderList;
        }
        String [] sortArr = sorts.split(";");
        for (String sort : sortArr) {
            String operator = sort.substring(0, 1);
            SortBuilder sortBuilder = new FieldSortBuilder(sort.substring(1));
            if ("-".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.DESC);
            } else if ("+".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.ASC);
            } else {
                sortBuilder.order(SortOrder.DESC);
            }
            sortBuilderList.add(sortBuilder);
        }
        return sortBuilderList;
    }
        /**
     * 根据SQL查找数据
     * @param field
     * @param sql
     * @return
     * @throws Exception
     */
    public List<Map<String, Object>> findBySql(List<String> field, String sql) throws Exception {
        BoolQueryBuilder boolQuery = null;
        try {
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
            SqlParser sqlParser = new SqlParser();
            MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) sqlExpr.getSubQuery().getQuery();
            WhereParser whereParser = new WhereParser(sqlParser, query);
            Where where = whereParser.findWhere();
            if (where != null) {
                boolQuery = QueryMaker.explan(where);
            }
        } catch (SqlParseException e) {
            logger.info("ReadES.createQueryBuilderByExpress-Exception,"+e.getMessage());
            e.printStackTrace();
        }
        //实例化查询请求对象
        SearchRequest request = new SearchRequest();
        //实例化SearchSourceBuilder
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        //根据索引、查询条件构建查询构造器
        //将查询构造器注入SearchSourceBuilder
        searchBuilder.query(boolQuery);
        //设置请求查询的索引(查询构造器中已指定,无需重复设置)
        //request.indices(indexName);
        //将构建好的SearchSourceBuilder注入请求
        request.source(searchBuilder);
        //带入请求执行查询
        SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        //得到查询结果
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        List<Map<String,Object>> listData = new ArrayList<>();
        //遍历查询结果
        for(SearchHit hit : searchHits){
            Map<String,Object> datas = hit.getSourceAsMap();
            Map<String,Object> result = new HashMap<>();
            for (String _field : field) {
                result.put(_field, datas.get(_field));
            }
            listData.add(result);
            logger.info(result.toString());
        }
      return listData;
    }
}

+ 8 - 2
svr/svr-quota/pom.xml

@ -136,15 +136,21 @@
                   <artifactId>commons-data-solr</artifactId>
               </dependency>-->
        <!-- ElasticSearch -->
        <dependency>
      <!--  <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
            <version>2.4.1.0</version>
        </dependency>-->
        <dependency>
            <groupId>com.yihu.jw</groupId>
            <artifactId>elasticsearch-starter</artifactId>
            <version>${version.wlyy-common}</version>
        </dependency>
        <!-- ElasticSearch -->
        <dependency>

+ 82 - 81
svr/svr-quota/src/main/java/com/yihu/jw/elasticsearch/ElasticSearchPool.java

@ -1,81 +1,82 @@
package com.yihu.jw.elasticsearch;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory;
import com.yihu.jw.elasticsearch.config.ElasticSearchConfig;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.net.InetSocketAddress;
import java.util.Properties;
/**
 * Created by progr1mmer on 2018/1/4.
 */
@Component
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class ElasticSearchPool {
    private static volatile TransportClient transportClient;
    @Autowired
    private ElasticSearchConfig elasticSearchConfig;
    private TransportClient getTransportClient() {
        Settings settings = Settings.builder()
                .put("cluster.name", elasticSearchConfig.getClusterName())
                .put("client.transport.sniff", false)
                .build();
        String[] nodeArr = elasticSearchConfig.getClusterNodes().split(",");
        InetSocketTransportAddress[] socketArr = new InetSocketTransportAddress[nodeArr.length];
        for (int i = 0; i < socketArr.length; i++) {
            if (!StringUtils.isEmpty(nodeArr[i])) {
                String[] nodeInfo = nodeArr[i].split(":");
                socketArr[i] = new InetSocketTransportAddress(new InetSocketAddress(nodeInfo[0], new Integer(nodeInfo[1])));
            }
        }
        return TransportClient.builder().settings(settings).build().addTransportAddresses(socketArr);
    }
    /**
     * 1.TransportClient本身支持多线程的数据请求
     * 2.移除多个TransportClient的线程池支持,减少Socket链接
     * 3.基于多重检查的单例模式,兼顾安全和效率
     * 4.为提高效率,使用完毕后请勿进行 transportClient.close() 的关闭操作
     * @return
     */
    public TransportClient getClient() {
        if (transportClient != null) {
            if (transportClient.connectedNodes().isEmpty()) {
                synchronized (TransportClient.class) {
                    if (transportClient.connectedNodes().isEmpty()) {
                        transportClient = getTransportClient();
                    }
                }
            }
            return transportClient;
        }
        synchronized (TransportClient.class) {
            if (null == transportClient) {
                transportClient = getTransportClient();
            }
        }
        return transportClient;
    }
    public DruidDataSource getDruidDataSource() throws Exception {
        Properties properties = new Properties();
        properties.put("url", "jdbc:elasticsearch://" + elasticSearchConfig.getClusterNodes() + "/");
        DruidDataSource druidDataSource = (DruidDataSource) ElasticSearchDruidDataSourceFactory
                .createDataSource(properties);
        druidDataSource.setInitialSize(1);
        return druidDataSource;
    }
}
//package com.yihu.jw.elasticsearch;
//
//import com.alibaba.druid.pool.DruidDataSource;
//import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory;
//import com.yihu.jw.elasticsearch.config.ElasticSearchConfig;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.Settings;
//
//import org.elasticsearch.common.transport.TransportAddress;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.config.ConfigurableBeanFactory;
//import org.springframework.context.annotation.Scope;
//import org.springframework.stereotype.Component;
//import org.springframework.util.StringUtils;
//
//import java.net.InetSocketAddress;
//import java.util.Properties;
//
///**
// * Created by progr1mmer on 2018/1/4.
// */
//@Component
//@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
//public class ElasticSearchPool {
//
//    private static volatile TransportClient transportClient;
//
//    @Autowired
//    private ElasticSearchConfig elasticSearchConfig;
//
//    private TransportClient getTransportClient() {
//        Settings settings = Settings.builder()
//                .put("cluster.name", elasticSearchConfig.getClusterName())
//                .put("client.transport.sniff", false)
//                .build();
//        String[] nodeArr = elasticSearchConfig.getClusterNodes().split(",");
//        TransportAddress[] socketArr = new TransportAddress[nodeArr.length];
//        for (int i = 0; i < socketArr.length; i++) {
//            if (!StringUtils.isEmpty(nodeArr[i])) {
//                String[] nodeInfo = nodeArr[i].split(":");
//                socketArr[i] = new TransportAddress(new InetSocketAddress(nodeInfo[0], new Integer(nodeInfo[1])));
//            }
//        }
//        return TransportClient.builder().settings(settings).build().addTransportAddresses(socketArr);
//    }
//
//    /**
//     * 1.TransportClient本身支持多线程的数据请求
//     * 2.移除多个TransportClient的线程池支持,减少Socket链接
//     * 3.基于多重检查的单例模式,兼顾安全和效率
//     * 4.为提高效率,使用完毕后请勿进行 transportClient.close() 的关闭操作
//     * @return
//     */
//    public TransportClient getClient() {
//        if (transportClient != null) {
//            if (transportClient.connectedNodes().isEmpty()) {
//                synchronized (TransportClient.class) {
//                    if (transportClient.connectedNodes().isEmpty()) {
//                        transportClient = getTransportClient();
//                    }
//                }
//            }
//            return transportClient;
//        }
//        synchronized (TransportClient.class) {
//            if (null == transportClient) {
//                transportClient = getTransportClient();
//            }
//        }
//        return transportClient;
//    }
//
//    public DruidDataSource getDruidDataSource() throws Exception {
//        Properties properties = new Properties();
//        properties.put("url", "jdbc:elasticsearch://" + elasticSearchConfig.getClusterNodes() + "/");
//        DruidDataSource druidDataSource = (DruidDataSource) ElasticSearchDruidDataSourceFactory
//                .createDataSource(properties);
//        druidDataSource.setInitialSize(1);
//        return druidDataSource;
//    }
//
//}

+ 780 - 780
svr/svr-quota/src/main/java/com/yihu/jw/elasticsearch/ElasticSearchUtil.java

@ -1,780 +1,780 @@
package com.yihu.jw.elasticsearch;
import com.alibaba.druid.pool.DruidDataSource;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.ParseException;
import java.util.*;
/**
 * Util - Es搜索服务
 * Created by progr1mmer on 2017/12/2.
 */
@Service
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class ElasticSearchUtil {
    @Autowired
    private ElasticSearchPool elasticSearchPool;
    /**
     * 创建映射
     *  注意:保存数据之前如果没有创建相应的字
     *  段映射会导致搜索结果不准确
     * @param index
     * @param type
     * @param source
     * @param setting - 该设置根据需要进行配置
     * @throws IOException
     */
    public void mapping (String index, String type, Map<String, Map<String, String>> source, Map<String, Object> setting) throws IOException{
        TransportClient transportClient = elasticSearchPool.getClient();
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties");
        for (String field : source.keySet()) {
            xContentBuilder.startObject(field);
            Map<String, String> propsMap = source.get(field);
            for (String prop : propsMap.keySet()) {
                xContentBuilder.field(prop, propsMap.get(prop));
            }
            xContentBuilder.endObject();
        }
        xContentBuilder.endObject().endObject();
        CreateIndexRequestBuilder createIndexRequestBuilder = transportClient.admin().indices().prepareCreate(index);
        createIndexRequestBuilder.addMapping(type, xContentBuilder);
        /*Map<String, Object> settingSource = new HashMap<>();
        settingSource.put("index.translog.flush_threshold_size", "1g"); //log文件大小
        settingSource.put("index.translog.flush_threshold_ops", "100000"); //flush触发次数
        settingSource.put("index.translog.durability", "async"); //异步更新
        settingSource.put("index.refresh_interval", "30s"); //刷新间隔
        settingSource.put("index.number_of_replicas", 1); //副本数
        settingSource.put("index.number_of_shards", 3); //分片数
        createIndexRequestBuilder.setSettings(settingSource);*/
        if (setting != null && !setting.isEmpty()) {
            createIndexRequestBuilder.setSettings(setting);
        }
        createIndexRequestBuilder.get();
    }
    /**
     * 移除索引 - 整个移除
     * @param index
     */
    public void remove (String index){
        TransportClient transportClient = elasticSearchPool.getClient();
        DeleteIndexRequestBuilder deleteIndexRequestBuilder = transportClient.admin().indices().prepareDelete(index);
        deleteIndexRequestBuilder.get();
    }
    /**
     * 添加数据
     * @param index
     * @param type
     * @param source
     * @return
     * @throws ParseException
     */
    public Map<String, Object> index (String index, String type, Map<String, Object> source) throws ParseException{
        TransportClient transportClient = elasticSearchPool.getClient();
        String _id = (String) source.remove("_id");
        if (StringUtils.isEmpty(_id)) {
            IndexResponse response = transportClient.prepareIndex(index, type).setSource(source).get();
            source.put("_id", response.getId());
        } else {
            IndexResponse response = transportClient.prepareIndex(index, type, _id).setSource(source).get();
            source.put("_id", response.getId());
        }
        return source;
    }
    /**
     * 批量添加数据 - 效率高
     * @param index
     * @param type
     * @param source
     * @throws ParseException
     */
    public void bulkIndex (String index, String type, List<Map<String, Object>> source) throws ParseException{
        if (source.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            source.forEach(item -> {
                String _id = (String) item.remove("_id");
                if (StringUtils.isEmpty(_id)) {
                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type).setSource(item));
                } else {
                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type, _id).setSource(item));
                }
            });
            bulkRequestBuilder.get();
        }
    }
    /**
     * 删除数据
     * @param index
     * @param type
     * @param id
     */
    public void delete (String index, String type, String id) {
        TransportClient transportClient = elasticSearchPool.getClient();
        transportClient.prepareDelete(index, type, id).get();
    }
    /**
     * 批量删除数据
     * @param index
     * @param type
     * @param idArr
     */
    public void bulkDelete (String index, String type, String [] idArr) {
        if (idArr.length > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            for (String id : idArr) {
                bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
            }
            bulkRequestBuilder.get();
        }
    }
    /**
     * 根据字段批量删除数据
     * @param index
     * @param type
     * @param field
     * @param value
     */
    public void deleteByField(String index, String type, String field, Object value) {
        deleteByFilter(index, type, field + "=" + value);
    }
    /**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param filters
     */
    public void deleteByFilter(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        deleteByFilter(index, type, queryBuilder);
    }
    /**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param queryBuilder
     */
    public void deleteByFilter(String index, String type, QueryBuilder queryBuilder) {
        long count = count(index, type, queryBuilder);
        long page = count/10000 == 0 ? 1 :count/10000 +1;
        for (long i =0;i<page;i++) {
            List<String> idList = getIds(index, type, queryBuilder);
            if (idList.size() > 0) {
                TransportClient transportClient = elasticSearchPool.getClient();
                String[] idArr = new String[idList.size()];
                idArr = idList.toArray(idArr);
                BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
                for (String id : idArr) {
                    bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
                }
                bulkRequestBuilder.get();
            }
        }
    }
    /**
     * 更新数据 - 返回最新文档
     * @param index
     * @param type
     * @param id
     * @param source
     * @return
     * @throws DocumentMissingException
     */
    public Map<String, Object> update(String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
        TransportClient transportClient = elasticSearchPool.getClient();
        source.remove("_id");
        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
        return findById(index, type, id);
    }
    /**
     * 更新数据 - 不返回文档
     * @param index
     * @param type
     * @param id
     * @param source
     * @throws DocumentMissingException
     */
    public void voidUpdate (String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
        TransportClient transportClient = elasticSearchPool.getClient();
        source.remove("_id");
        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
    }
    /**
     * 批量更新数据
     * @param index
     * @param type
     * @param source
     * @throws DocumentMissingException
     */
    public void bulkUpdate(String index, String type, List<Map<String, Object>> source) throws DocumentMissingException {
        if (source.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            source.forEach(item -> {
                String _id = (String)item.remove("_id");
                if (!StringUtils.isEmpty(_id)) {
                    bulkRequestBuilder.add(transportClient.prepareUpdate(index, type, _id).setDoc(item).setRetryOnConflict(5));
                }
            });
            bulkRequestBuilder.get();
        }
    }
    /**
     * 根据ID查找数据
     * @param index
     * @param type
     * @param id
     * @return
     */
    public Map<String, Object> findById(String index, String type, String id) {
        TransportClient transportClient = elasticSearchPool.getClient();
        GetRequest getRequest = new GetRequest(index, type, id);
        GetResponse response = transportClient.get(getRequest).actionGet();
        Map<String, Object> source = response.getSource();
        if (source != null) {
            source.put("_id", response.getId());
        }
        return source;
    }
    /**
     * 根据字段查找数据
     * @param index
     * @param type
     * @param field
     * @param value
     * @return
     */
    public List<Map<String, Object>> findByField(String index, String type, String field, Object value) {
        return list(index, type, field + "=" + value);
    }
    /**
     * 获取文档列表
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public List<Map<String, Object>> list(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return list(index, type, queryBuilder);
    }
    /**
     * 获取文档列表
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public List<Map<String, Object>> list(String index, String type, QueryBuilder queryBuilder) {
        int size = (int)count(index, type, queryBuilder);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSource();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        return resultList;
    }
    /**
     * 获取文档分页
     * @param index
     * @param type
     * @param filters
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, int page, int size) {
        return page(index, type, filters, null, page, size);
    }
    /**
     * 获取文档分页
     * @param index
     * @param type
     * @param filters
     * @param sorts
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> pageBySort(String index, String type, String filters, String sorts, int page, int size) {
        return page(index, type, filters, sorts, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param filters
     * @param sorts
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, String sorts, int page, int size) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        List<SortBuilder> sortBuilders = getSortBuilder(sorts);
        return page(index, type, queryBuilder, sortBuilders, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param queryBuilder
     * @param sortBuilders
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, int page, int size) {
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, sortBuilders, (page - 1) * size, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSource();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        PageRequest pageRequest = PageRequest.of(page-1,size);
        return new PageImpl<>(resultList, pageRequest, hits.totalHits());
    }
    /**
     * 获取ID列表
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public List<String> getIds (String index, String type, String filters){
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return getIds(index, type, queryBuilder);
    }
    /**
     * 获取ID列表
     * @param index
     * @param type
     * @param queryBuilder
     * 最多只能一万条
     * @return
     */
    public List<String> getIds (String index, String type, QueryBuilder queryBuilder) {
        int size = (int)count(index, type, queryBuilder);
        size = size > 10000 ? 10000:size;
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<String> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            resultList.add(hit.getId());
        }
        return resultList;
    }
    /**
     * 获取文档数
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public long count(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return count(index, type, queryBuilder);
    }
    /**
     * 获取文档数
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public long count(String index, String type, QueryBuilder queryBuilder) {
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        return builder.get().getHits().totalHits();
    }
    /**
     * 根据SQL查找数据
     * @param field
     * @param sql
     * @return
     * @throws Exception
     */
    public List<Map<String, Object>> findBySql(List<String> field, String sql) throws Exception {
        List<Map<String, Object>> list = new ArrayList<>();
        DruidDataSource druidDataSource = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            druidDataSource = elasticSearchPool.getDruidDataSource();
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                Map<String, Object> rowData = new HashMap<>();
                for (String _field : field) {
                    rowData.put(_field, resultSet.getObject(_field));
                }
                list.add(rowData);
            }
            return list;
        } catch (Exception e) {
            if (!"Error".equals(e.getMessage())){
                e.printStackTrace();
            }
            return new ArrayList<>();
        } finally {
            if (resultSet != null) {
                resultSet.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
            if (druidDataSource != null) {
                druidDataSource.close();
            }
        }
    }
    /**
     * 根据SQL查找数据
     * @param sql
     * @return
     * @throws Exception
     */
    public ResultSet findBySql(String sql) throws Exception {
        DruidDataSource druidDataSource = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            druidDataSource = elasticSearchPool.getDruidDataSource();
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            resultSet = preparedStatement.executeQuery();
            return resultSet;
        } finally {
            if (resultSet != null) {
                resultSet.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
            if (druidDataSource != null) {
                druidDataSource.close();
            }
        }
    }
    /**
     * 根据日期分组
     * @param index
     * @param type
     * @param filters
     * @param start
     * @param end
     * @param field
     * @param interval
     * @param format
     * @return
     */
    public Map<String, Long> dateHistogram(String index, String type, String filters, Date start, Date end, String field, DateHistogramInterval interval, String format) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
        DateHistogramBuilder dateHistogramBuilder = new DateHistogramBuilder(index + "-" + field);
        dateHistogramBuilder.field(field);
        dateHistogramBuilder.interval(interval);
        if (!StringUtils.isEmpty(format)) {
            dateHistogramBuilder.format(format);
        }
        dateHistogramBuilder.minDocCount(0);
        dateHistogramBuilder.extendedBounds(start.getTime(), end.getTime());
        builder.addAggregation(dateHistogramBuilder);
        SearchResponse response = builder.get();
        Histogram histogram = response.getAggregations().get(index + "-" + field);
        Map<String, Long> temp = new HashMap<>();
        histogram.getBuckets().forEach(item -> temp.put(item.getKeyAsString(), item.getDocCount()));
        return temp;
    }
    /**
     * 查询去重数量
     * @param index
     * @param type
     * @param filters
     * @param filed
     * @return
     */
    public int cardinality(String index, String type, String filters, String filed){
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
        CardinalityBuilder cardinality = AggregationBuilders.cardinality("cardinality").field(filed);
        builder.addAggregation(cardinality);
        SearchResponse response = builder.get();
        InternalCardinality internalCard = response.getAggregations().get("cardinality");
        return new Double(internalCard.getProperty("value").toString()).intValue();
    }
    /**
     * 分组统计
     * @param index
     * @param type
     * @param filters
     * @param groupField
     * @return
     */
    public Map<String, Long> countByGroup(String index, String type, String filters, String groupField) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        AbstractAggregationBuilder aggregation = AggregationBuilders.terms("count").field(groupField);
        builder.addAggregation(aggregation);
        SearchResponse response = builder.get();
        Terms terms = response.getAggregations().get("count");
        List<Terms.Bucket> buckets = terms.getBuckets();
        Map<String, Long> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets) {
            //System.out.println(bucket.getKey()+"----"+bucket.getDocCount());
            groupMap.put(bucket.getKey().toString(), bucket.getDocCount());
        }
        return groupMap;
    }
    /**
     * 分组求和
     * @param index
     * @param type
     * @param filters
     * @param sumField
     * @param groupField
     * @return
     */
    public Map<String, Double> sumByGroup(String index, String type, String filters, String sumField, String groupField) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        TermsBuilder aggregation = AggregationBuilders.terms("sum_query").field(groupField);
        SumBuilder sumBuilder= AggregationBuilders.sum("sum_row").field(sumField);
        aggregation.subAggregation(sumBuilder);
        builder.addAggregation(aggregation);
        SearchResponse response = builder.get();
        Terms terms = response.getAggregations().get("sum_query");
        List<Terms.Bucket> buckets = terms.getBuckets();
        Map<String, Double> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets){
            Sum sum2 = bucket.getAggregations().get("sum_row");
            groupMap.put(bucket.getKey().toString(), sum2.getValue());
        }
        return groupMap;
    }
    /**
     * 获取基础请求生成器
     * @param index
     * @param type
     * @param queryBuilder
     * @param sortBuilders
     * @return
     */
    public SearchRequestBuilder searchRequestBuilder(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, Integer from, Integer size) {
        TransportClient transportClient = elasticSearchPool.getClient();
        SearchRequestBuilder builder = transportClient.prepareSearch(index);
        builder.setTypes(type);
        builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
        builder.setQuery(queryBuilder);
        builder.setExplain(true);
        if (sortBuilders != null) {
            sortBuilders.forEach(item -> builder.addSort(item));
        }
        if (from != null) {
            builder.setFrom(from);
        }
        if (size != null) {
            builder.setSize(size);
        }
        return builder;
    }
    /**
     * 排序语句转换
     * @param sorts
     * @return
     */
    public List<SortBuilder> getSortBuilder(String sorts) {
        List<SortBuilder> sortBuilderList = new ArrayList<>();
        if (StringUtils.isEmpty(sorts)) {
            return sortBuilderList;
        }
        String [] sortArr = sorts.split(";");
        for (String sort : sortArr) {
            String operator = sort.substring(0, 1);
            SortBuilder sortBuilder = new FieldSortBuilder(sort.substring(1));
            if ("-".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.DESC);
            } else if ("+".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.ASC);
            } else {
                sortBuilder.order(SortOrder.DESC);
            }
            sortBuilderList.add(sortBuilder);
        }
        return sortBuilderList;
    }
    /**
     * 查询语句转换
     * @param filters
     * @return
     */
    public QueryBuilder getQueryBuilder(String filters) {
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (StringUtils.isEmpty(filters)) {
            return boolQueryBuilder;
        }
        String [] filterArr = filters.split(";");
        for (String filter : filterArr) {
            if (filter.contains("||")){
                String [] fields = filter.split("\\|\\|");
                BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                for (String filed : fields) {
                    String [] condition = filed.split("=");
                    if ("null".equals(condition[1])) {
                        condition[1] ="";
                    }
                    queryBuilder.should(QueryBuilders.termQuery(condition[0], condition[1]));
                }
                boolQueryBuilder.must(queryBuilder);
            } else if (filter.contains("?")) {
                String [] condition = filter.split("\\?");
                if ("null".equals(condition[1])) {
                    condition[1] ="";
                }
                MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchPhraseQuery(condition[0], condition[1]);
                boolQueryBuilder.must(matchQueryBuilder);
            } else if (filter.contains("<>")) {
                String [] condition = filter.split("<>");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.mustNot(termsQueryBuilder);
                } else {
                    if ("null".equals(condition[1])) {
                        condition[1] ="";
                    }
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.mustNot(termQueryBuilder);
                }
            } else if (filter.contains(">=")) {
                String [] condition = filter.split(">=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains(">")) {
                String [] condition = filter.split(">");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<=")) {
                String [] condition = filter.split("<=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<")) {
                String [] condition = filter.split("<");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("=")) {
                String [] condition = filter.split("=");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.must(termsQueryBuilder);
                } else {
                    if ("null".equals(condition[1])) {
                        condition[1] = "";
                    }
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.must(termQueryBuilder);
                }
            }
        }
        return boolQueryBuilder;
    }
}
//package com.yihu.jw.elasticsearch;
//
//import com.alibaba.druid.pool.DruidDataSource;
//import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
//import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
//import org.elasticsearch.action.bulk.BulkRequestBuilder;
//import org.elasticsearch.action.get.GetRequest;
//import org.elasticsearch.action.get.GetResponse;
//import org.elasticsearch.action.index.IndexResponse;
//import org.elasticsearch.action.search.SearchRequestBuilder;
//import org.elasticsearch.action.search.SearchResponse;
//import org.elasticsearch.action.search.SearchType;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.xcontent.XContentBuilder;
//import org.elasticsearch.common.xcontent.XContentFactory;
//import org.elasticsearch.index.engine.DocumentMissingException;
//import org.elasticsearch.index.query.*;
//import org.elasticsearch.search.SearchHit;
//import org.elasticsearch.search.SearchHits;
//import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
//import org.elasticsearch.search.aggregations.AggregationBuilders;
//import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
//import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
//import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
//import org.elasticsearch.search.aggregations.bucket.terms.Terms;
//import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
//import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityBuilder;
//import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
//import org.elasticsearch.search.aggregations.metrics.sum.Sum;
//import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
//import org.elasticsearch.search.sort.FieldSortBuilder;
//import org.elasticsearch.search.sort.SortBuilder;
//import org.elasticsearch.search.sort.SortOrder;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.config.ConfigurableBeanFactory;
//import org.springframework.context.annotation.Scope;
//import org.springframework.data.domain.Page;
//import org.springframework.data.domain.PageImpl;
//import org.springframework.data.domain.PageRequest;
//import org.springframework.stereotype.Service;
//import org.springframework.util.StringUtils;
//
//import java.io.IOException;
//import java.sql.Connection;
//import java.sql.PreparedStatement;
//import java.sql.ResultSet;
//import java.text.ParseException;
//import java.util.*;
//
///**
// * Util - Es搜索服务
// * Created by progr1mmer on 2017/12/2.
// */
//@Service
//@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
//public class ElasticSearchUtil {
//
//    @Autowired
//    private ElasticSearchPool elasticSearchPool;
//
//    /**
//     * 创建映射
//     *  注意:保存数据之前如果没有创建相应的字
//     *  段映射会导致搜索结果不准确
//     * @param index
//     * @param type
//     * @param source
//     * @param setting - 该设置根据需要进行配置
//     * @throws IOException
//     */
//    public void mapping (String index, String type, Map<String, Map<String, String>> source, Map<String, Object> setting) throws IOException{
//        TransportClient transportClient = elasticSearchPool.getClient();
//        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties");
//        for (String field : source.keySet()) {
//            xContentBuilder.startObject(field);
//            Map<String, String> propsMap = source.get(field);
//            for (String prop : propsMap.keySet()) {
//                xContentBuilder.field(prop, propsMap.get(prop));
//            }
//            xContentBuilder.endObject();
//        }
//        xContentBuilder.endObject().endObject();
//        CreateIndexRequestBuilder createIndexRequestBuilder = transportClient.admin().indices().prepareCreate(index);
//        createIndexRequestBuilder.addMapping(type, xContentBuilder);
//        /*Map<String, Object> settingSource = new HashMap<>();
//        settingSource.put("index.translog.flush_threshold_size", "1g"); //log文件大小
//        settingSource.put("index.translog.flush_threshold_ops", "100000"); //flush触发次数
//        settingSource.put("index.translog.durability", "async"); //异步更新
//        settingSource.put("index.refresh_interval", "30s"); //刷新间隔
//        settingSource.put("index.number_of_replicas", 1); //副本数
//        settingSource.put("index.number_of_shards", 3); //分片数
//        createIndexRequestBuilder.setSettings(settingSource);*/
//        if (setting != null && !setting.isEmpty()) {
//            createIndexRequestBuilder.setSettings(setting);
//        }
//        createIndexRequestBuilder.get();
//    }
//
//    /**
//     * 移除索引 - 整个移除
//     * @param index
//     */
//    public void remove (String index){
//        TransportClient transportClient = elasticSearchPool.getClient();
//        DeleteIndexRequestBuilder deleteIndexRequestBuilder = transportClient.admin().indices().prepareDelete(index);
//        deleteIndexRequestBuilder.get();
//    }
//
//    /**
//     * 添加数据
//     * @param index
//     * @param type
//     * @param source
//     * @return
//     * @throws ParseException
//     */
//    public Map<String, Object> index (String index, String type, Map<String, Object> source) throws ParseException{
//        TransportClient transportClient = elasticSearchPool.getClient();
//        String _id = (String) source.remove("_id");
//        if (StringUtils.isEmpty(_id)) {
//            IndexResponse response = transportClient.prepareIndex(index, type).setSource(source).get();
//            source.put("_id", response.getId());
//        } else {
//            IndexResponse response = transportClient.prepareIndex(index, type, _id).setSource(source).get();
//            source.put("_id", response.getId());
//        }
//        return source;
//    }
//
//    /**
//     * 批量添加数据 - 效率高
//     * @param index
//     * @param type
//     * @param source
//     * @throws ParseException
//     */
//    public void bulkIndex (String index, String type, List<Map<String, Object>> source) throws ParseException{
//        if (source.size() > 0) {
//            TransportClient transportClient = elasticSearchPool.getClient();
//            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
//            source.forEach(item -> {
//                String _id = (String) item.remove("_id");
//                if (StringUtils.isEmpty(_id)) {
//                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type).setSource(item));
//                } else {
//                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type, _id).setSource(item));
//                }
//            });
//            bulkRequestBuilder.get();
//        }
//    }
//
//    /**
//     * 删除数据
//     * @param index
//     * @param type
//     * @param id
//     */
//    public void delete (String index, String type, String id) {
//        TransportClient transportClient = elasticSearchPool.getClient();
//        transportClient.prepareDelete(index, type, id).get();
//    }
//
//    /**
//     * 批量删除数据
//     * @param index
//     * @param type
//     * @param idArr
//     */
//    public void bulkDelete (String index, String type, String [] idArr) {
//        if (idArr.length > 0) {
//            TransportClient transportClient = elasticSearchPool.getClient();
//            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
//            for (String id : idArr) {
//                bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
//            }
//            bulkRequestBuilder.get();
//        }
//    }
//
//    /**
//     * 根据字段批量删除数据
//     * @param index
//     * @param type
//     * @param field
//     * @param value
//     */
//    public void deleteByField(String index, String type, String field, Object value) {
//        deleteByFilter(index, type, field + "=" + value);
//    }
//
//    /**
//     * 根据条件批量删除数据
//     * @param index
//     * @param type
//     * @param filters
//     */
//    public void deleteByFilter(String index, String type, String filters) {
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        deleteByFilter(index, type, queryBuilder);
//    }
//
//    /**
//     * 根据条件批量删除数据
//     * @param index
//     * @param type
//     * @param queryBuilder
//     */
//    public void deleteByFilter(String index, String type, QueryBuilder queryBuilder) {
//        long count = count(index, type, queryBuilder);
//        long page = count/10000 == 0 ? 1 :count/10000 +1;
//        for (long i =0;i<page;i++) {
//            List<String> idList = getIds(index, type, queryBuilder);
//            if (idList.size() > 0) {
//                TransportClient transportClient = elasticSearchPool.getClient();
//                String[] idArr = new String[idList.size()];
//                idArr = idList.toArray(idArr);
//                BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
//                for (String id : idArr) {
//                    bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
//                }
//                bulkRequestBuilder.get();
//            }
//        }
//    }
//
//    /**
//     * 更新数据 - 返回最新文档
//     * @param index
//     * @param type
//     * @param id
//     * @param source
//     * @return
//     * @throws DocumentMissingException
//     */
//    public Map<String, Object> update(String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
//        TransportClient transportClient = elasticSearchPool.getClient();
//        source.remove("_id");
//        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
//        return findById(index, type, id);
//    }
//
//    /**
//     * 更新数据 - 不返回文档
//     * @param index
//     * @param type
//     * @param id
//     * @param source
//     * @throws DocumentMissingException
//     */
//    public void voidUpdate (String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
//        TransportClient transportClient = elasticSearchPool.getClient();
//        source.remove("_id");
//        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
//    }
//
//    /**
//     * 批量更新数据
//     * @param index
//     * @param type
//     * @param source
//     * @throws DocumentMissingException
//     */
//    public void bulkUpdate(String index, String type, List<Map<String, Object>> source) throws DocumentMissingException {
//        if (source.size() > 0) {
//            TransportClient transportClient = elasticSearchPool.getClient();
//            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
//            source.forEach(item -> {
//                String _id = (String)item.remove("_id");
//                if (!StringUtils.isEmpty(_id)) {
//                    bulkRequestBuilder.add(transportClient.prepareUpdate(index, type, _id).setDoc(item).setRetryOnConflict(5));
//                }
//            });
//            bulkRequestBuilder.get();
//        }
//    }
//
//    /**
//     * 根据ID查找数据
//     * @param index
//     * @param type
//     * @param id
//     * @return
//     */
//    public Map<String, Object> findById(String index, String type, String id) {
//        TransportClient transportClient = elasticSearchPool.getClient();
//        GetRequest getRequest = new GetRequest(index, type, id);
//        GetResponse response = transportClient.get(getRequest).actionGet();
//        Map<String, Object> source = response.getSource();
//        if (source != null) {
//            source.put("_id", response.getId());
//        }
//        return source;
//    }
//
//    /**
//     * 根据字段查找数据
//     * @param index
//     * @param type
//     * @param field
//     * @param value
//     * @return
//     */
//    public List<Map<String, Object>> findByField(String index, String type, String field, Object value) {
//        return list(index, type, field + "=" + value);
//    }
//
//    /**
//     * 获取文档列表
//     * @param index
//     * @param type
//     * @param filters
//     * @return
//     */
//    public List<Map<String, Object>> list(String index, String type, String filters) {
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        return list(index, type, queryBuilder);
//    }
//
//    /**
//     * 获取文档列表
//     * @param index
//     * @param type
//     * @param queryBuilder
//     * @return
//     */
//    public List<Map<String, Object>> list(String index, String type, QueryBuilder queryBuilder) {
//        int size = (int)count(index, type, queryBuilder);
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
//        SearchResponse response = builder.get();
//        SearchHits hits = response.getHits();
//        List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
//        for (SearchHit hit : hits.getHits()) {
//            Map<String, Object> source = hit.getSource();
//            source.put("_id", hit.getId());
//            resultList.add(source);
//        }
//        return resultList;
//    }
//
//    /**
//     * 获取文档分页
//     * @param index
//     * @param type
//     * @param filters
//     * @param page
//     * @param size
//     * @return
//     */
//    public Page<Map<String, Object>> page(String index, String type, String filters, int page, int size) {
//        return page(index, type, filters, null, page, size);
//    }
//
//    /**
//     * 获取文档分页
//     * @param index
//     * @param type
//     * @param filters
//     * @param sorts
//     * @param page
//     * @param size
//     * @return
//     */
//    public Page<Map<String, Object>> pageBySort(String index, String type, String filters, String sorts, int page, int size) {
//        return page(index, type, filters, sorts, page, size);
//    }
//
//    /**
//     * 获取分档分页 - 带分页功能
//     * @param index
//     * @param type
//     * @param filters
//     * @param sorts
//     * @param page
//     * @param size
//     * @return
//     */
//    public Page<Map<String, Object>> page(String index, String type, String filters, String sorts, int page, int size) {
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        List<SortBuilder> sortBuilders = getSortBuilder(sorts);
//        return page(index, type, queryBuilder, sortBuilders, page, size);
//    }
//
//    /**
//     * 获取分档分页 - 带分页功能
//     * @param index
//     * @param type
//     * @param queryBuilder
//     * @param sortBuilders
//     * @param page
//     * @param size
//     * @return
//     */
//    public Page<Map<String, Object>> page(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, int page, int size) {
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, sortBuilders, (page - 1) * size, size);
//        SearchResponse response = builder.get();
//        SearchHits hits = response.getHits();
//        List<Map<String, Object>> resultList = new ArrayList<>();
//        for (SearchHit hit : hits.getHits()) {
//            Map<String, Object> source = hit.getSource();
//            source.put("_id", hit.getId());
//            resultList.add(source);
//        }
//        PageRequest pageRequest = PageRequest.of(page-1,size);
//        return new PageImpl<>(resultList, pageRequest, hits.totalHits());
//    }
//
//    /**
//     * 获取ID列表
//     * @param index
//     * @param type
//     * @param filters
//     * @return
//     */
//    public List<String> getIds (String index, String type, String filters){
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        return getIds(index, type, queryBuilder);
//    }
//
//    /**
//     * 获取ID列表
//     * @param index
//     * @param type
//     * @param queryBuilder
//     * 最多只能一万条
//     * @return
//     */
//    public List<String> getIds (String index, String type, QueryBuilder queryBuilder) {
//        int size = (int)count(index, type, queryBuilder);
//        size = size > 10000 ? 10000:size;
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
//        SearchResponse response = builder.get();
//        SearchHits hits = response.getHits();
//        List<String> resultList = new ArrayList<>();
//        for (SearchHit hit : hits.getHits()) {
//            resultList.add(hit.getId());
//        }
//        return resultList;
//    }
//
//    /**
//     * 获取文档数
//     * @param index
//     * @param type
//     * @param filters
//     * @return
//     */
//    public long count(String index, String type, String filters) {
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        return count(index, type, queryBuilder);
//    }
//
//    /**
//     * 获取文档数
//     * @param index
//     * @param type
//     * @param queryBuilder
//     * @return
//     */
//    public long count(String index, String type, QueryBuilder queryBuilder) {
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
//        return builder.get().getHits().totalHits();
//    }
//
//    /**
//     * 根据SQL查找数据
//     * @param field
//     * @param sql
//     * @return
//     * @throws Exception
//     */
//    public List<Map<String, Object>> findBySql(List<String> field, String sql) throws Exception {
//        List<Map<String, Object>> list = new ArrayList<>();
//        DruidDataSource druidDataSource = null;
//        Connection connection = null;
//        PreparedStatement preparedStatement = null;
//        ResultSet resultSet = null;
//        try {
//            druidDataSource = elasticSearchPool.getDruidDataSource();
//            connection = druidDataSource.getConnection();
//            preparedStatement = connection.prepareStatement(sql);
//            resultSet = preparedStatement.executeQuery();
//            while (resultSet.next()) {
//                Map<String, Object> rowData = new HashMap<>();
//                for (String _field : field) {
//                    rowData.put(_field, resultSet.getObject(_field));
//                }
//                list.add(rowData);
//            }
//            return list;
//        } catch (Exception e) {
//            if (!"Error".equals(e.getMessage())){
//                e.printStackTrace();
//            }
//            return new ArrayList<>();
//        } finally {
//            if (resultSet != null) {
//                resultSet.close();
//            }
//            if (preparedStatement != null) {
//                preparedStatement.close();
//            }
//            if (connection != null) {
//                connection.close();
//            }
//            if (druidDataSource != null) {
//                druidDataSource.close();
//            }
//        }
//    }
//
//
//    /**
//     * 根据SQL查找数据
//     * @param sql
//     * @return
//     * @throws Exception
//     */
//    public ResultSet findBySql(String sql) throws Exception {
//        DruidDataSource druidDataSource = null;
//        Connection connection = null;
//        PreparedStatement preparedStatement = null;
//        ResultSet resultSet = null;
//        try {
//            druidDataSource = elasticSearchPool.getDruidDataSource();
//            connection = druidDataSource.getConnection();
//            preparedStatement = connection.prepareStatement(sql);
//            resultSet = preparedStatement.executeQuery();
//            return resultSet;
//        } finally {
//            if (resultSet != null) {
//                resultSet.close();
//            }
//            if (preparedStatement != null) {
//                preparedStatement.close();
//            }
//            if (connection != null) {
//                connection.close();
//            }
//            if (druidDataSource != null) {
//                druidDataSource.close();
//            }
//        }
//    }
//
//    /**
//     * 根据日期分组
//     * @param index
//     * @param type
//     * @param filters
//     * @param start
//     * @param end
//     * @param field
//     * @param interval
//     * @param format
//     * @return
//     */
//    public Map<String, Long> dateHistogram(String index, String type, String filters, Date start, Date end, String field, DateHistogramInterval interval, String format) {
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
//        DateHistogramBuilder dateHistogramBuilder = new DateHistogramBuilder(index + "-" + field);
//        dateHistogramBuilder.field(field);
//        dateHistogramBuilder.interval(interval);
//        if (!StringUtils.isEmpty(format)) {
//            dateHistogramBuilder.format(format);
//        }
//        dateHistogramBuilder.minDocCount(0);
//        dateHistogramBuilder.extendedBounds(start.getTime(), end.getTime());
//        builder.addAggregation(dateHistogramBuilder);
//        SearchResponse response = builder.get();
//        Histogram histogram = response.getAggregations().get(index + "-" + field);
//        Map<String, Long> temp = new HashMap<>();
//        histogram.getBuckets().forEach(item -> temp.put(item.getKeyAsString(), item.getDocCount()));
//        return temp;
//    }
//
//    /**
//     * 查询去重数量
//     * @param index
//     * @param type
//     * @param filters
//     * @param filed
//     * @return
//     */
//    public int cardinality(String index, String type, String filters, String filed){
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
//        CardinalityBuilder cardinality = AggregationBuilders.cardinality("cardinality").field(filed);
//        builder.addAggregation(cardinality);
//        SearchResponse response = builder.get();
//        InternalCardinality internalCard = response.getAggregations().get("cardinality");
//        return new Double(internalCard.getProperty("value").toString()).intValue();
//    }
//
//    /**
//     * 分组统计
//     * @param index
//     * @param type
//     * @param filters
//     * @param groupField
//     * @return
//     */
//    public Map<String, Long> countByGroup(String index, String type, String filters, String groupField) {
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
//        AbstractAggregationBuilder aggregation = AggregationBuilders.terms("count").field(groupField);
//        builder.addAggregation(aggregation);
//        SearchResponse response = builder.get();
//        Terms terms = response.getAggregations().get("count");
//        List<Terms.Bucket> buckets = terms.getBuckets();
//        Map<String, Long> groupMap = new HashMap<>();
//        for (Terms.Bucket bucket : buckets) {
//            //System.out.println(bucket.getKey()+"----"+bucket.getDocCount());
//            groupMap.put(bucket.getKey().toString(), bucket.getDocCount());
//        }
//        return groupMap;
//    }
//
//    /**
//     * 分组求和
//     * @param index
//     * @param type
//     * @param filters
//     * @param sumField
//     * @param groupField
//     * @return
//     */
//    public Map<String, Double> sumByGroup(String index, String type, String filters, String sumField, String groupField) {
//        QueryBuilder queryBuilder = getQueryBuilder(filters);
//        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
//        TermsBuilder aggregation = AggregationBuilders.terms("sum_query").field(groupField);
//        SumBuilder sumBuilder= AggregationBuilders.sum("sum_row").field(sumField);
//        aggregation.subAggregation(sumBuilder);
//        builder.addAggregation(aggregation);
//        SearchResponse response = builder.get();
//        Terms terms = response.getAggregations().get("sum_query");
//        List<Terms.Bucket> buckets = terms.getBuckets();
//        Map<String, Double> groupMap = new HashMap<>();
//        for (Terms.Bucket bucket : buckets){
//            Sum sum2 = bucket.getAggregations().get("sum_row");
//            groupMap.put(bucket.getKey().toString(), sum2.getValue());
//        }
//        return groupMap;
//    }
//
//    /**
//     * 获取基础请求生成器
//     * @param index
//     * @param type
//     * @param queryBuilder
//     * @param sortBuilders
//     * @return
//     */
//    public SearchRequestBuilder searchRequestBuilder(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, Integer from, Integer size) {
//        TransportClient transportClient = elasticSearchPool.getClient();
//        SearchRequestBuilder builder = transportClient.prepareSearch(index);
//        builder.setTypes(type);
//        builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
//        builder.setQuery(queryBuilder);
//        builder.setExplain(true);
//        if (sortBuilders != null) {
//            sortBuilders.forEach(item -> builder.addSort(item));
//        }
//        if (from != null) {
//            builder.setFrom(from);
//        }
//        if (size != null) {
//            builder.setSize(size);
//        }
//        return builder;
//    }
//
//    /**
//     * 排序语句转换
//     * @param sorts
//     * @return
//     */
//    public List<SortBuilder> getSortBuilder(String sorts) {
//        List<SortBuilder> sortBuilderList = new ArrayList<>();
//        if (StringUtils.isEmpty(sorts)) {
//            return sortBuilderList;
//        }
//        String [] sortArr = sorts.split(";");
//        for (String sort : sortArr) {
//            String operator = sort.substring(0, 1);
//            SortBuilder sortBuilder = new FieldSortBuilder(sort.substring(1));
//            if ("-".equalsIgnoreCase(operator.trim())) {
//                sortBuilder.order(SortOrder.DESC);
//            } else if ("+".equalsIgnoreCase(operator.trim())) {
//                sortBuilder.order(SortOrder.ASC);
//            } else {
//                sortBuilder.order(SortOrder.DESC);
//            }
//            sortBuilderList.add(sortBuilder);
//        }
//        return sortBuilderList;
//    }
//
//    /**
//     * 查询语句转换
//     * @param filters
//     * @return
//     */
//    public QueryBuilder getQueryBuilder(String filters) {
//        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//        if (StringUtils.isEmpty(filters)) {
//            return boolQueryBuilder;
//        }
//        String [] filterArr = filters.split(";");
//        for (String filter : filterArr) {
//            if (filter.contains("||")){
//                String [] fields = filter.split("\\|\\|");
//                BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
//                for (String filed : fields) {
//                    String [] condition = filed.split("=");
//                    if ("null".equals(condition[1])) {
//                        condition[1] ="";
//                    }
//                    queryBuilder.should(QueryBuilders.termQuery(condition[0], condition[1]));
//                }
//                boolQueryBuilder.must(queryBuilder);
//            } else if (filter.contains("?")) {
//                String [] condition = filter.split("\\?");
//                if ("null".equals(condition[1])) {
//                    condition[1] ="";
//                }
//                MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchPhraseQuery(condition[0], condition[1]);
//                boolQueryBuilder.must(matchQueryBuilder);
//            } else if (filter.contains("<>")) {
//                String [] condition = filter.split("<>");
//                if (condition[1].contains(",")) {
//                    String [] inCondition = condition[1].split(",");
//                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
//                    boolQueryBuilder.mustNot(termsQueryBuilder);
//                } else {
//                    if ("null".equals(condition[1])) {
//                        condition[1] ="";
//                    }
//                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
//                    boolQueryBuilder.mustNot(termQueryBuilder);
//                }
//            } else if (filter.contains(">=")) {
//                String [] condition = filter.split(">=");
//                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
//                rangeQueryBuilder.gte(condition[1]);
//                boolQueryBuilder.must(rangeQueryBuilder);
//            } else if (filter.contains(">")) {
//                String [] condition = filter.split(">");
//                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
//                rangeQueryBuilder.gt(condition[1]);
//                boolQueryBuilder.must(rangeQueryBuilder);
//            } else if (filter.contains("<=")) {
//                String [] condition = filter.split("<=");
//                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
//                rangeQueryBuilder.lte(condition[1]);
//                boolQueryBuilder.must(rangeQueryBuilder);
//            } else if (filter.contains("<")) {
//                String [] condition = filter.split("<");
//                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
//                rangeQueryBuilder.lt(condition[1]);
//                boolQueryBuilder.must(rangeQueryBuilder);
//            } else if (filter.contains("=")) {
//                String [] condition = filter.split("=");
//                if (condition[1].contains(",")) {
//                    String [] inCondition = condition[1].split(",");
//                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
//                    boolQueryBuilder.must(termsQueryBuilder);
//                } else {
//                    if ("null".equals(condition[1])) {
//                        condition[1] = "";
//                    }
//                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
//                    boolQueryBuilder.must(termQueryBuilder);
//                }
//            }
//        }
//        return boolQueryBuilder;
//    }
//
//}

+ 5 - 1
svr/svr-quota/src/main/java/com/yihu/jw/elasticsearch/config/ElasticSearchConfig.java

@ -1,3 +1,4 @@
/*
package com.yihu.jw.elasticsearch.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -5,9 +6,11 @@ import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
*/
/**
 * Created by progr1mmer on 2017/12/1.
 */
 *//*
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@ -43,3 +46,4 @@ public class ElasticSearchConfig {
        System.out.println("Elasticsearch.configInfo : " + info.toString());
    }
}
*/

+ 65 - 103
svr/svr-quota/src/main/java/com/yihu/jw/quota/controller/ElasticSearchController.java

@ -1,13 +1,9 @@
package com.yihu.jw.quota.controller;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.elasticsearch.ElasticSearchUtil;
import com.yihu.jw.elasticsearch.config.ElasticSearchConfig;
import com.yihu.jw.quota.etl.util.ElasticsearchUtil;
import com.yihu.jw.quota.etl.util.EsClientUtil;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.elasticsearch.ElasticSearch7Util;
import com.yihu.jw.restmodel.web.MixEnvelop;
import com.yihu.jw.quota.vo.PersonalInfoModel;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@ -16,6 +12,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@ -39,13 +36,9 @@ public class ElasticSearchController extends BaseController {
    @Autowired
    private  ObjectMapper objectMapper;
    @Autowired
    private EsClientUtil esClientUtil;
    private ElasticSearch7Util elasticsearchUtil;
    @Autowired
    private ElasticsearchUtil elasticsearchUtil;
    @Autowired
    private ElasticSearchConfig elasticSearchConfig;
    @Autowired
    private ElasticSearchUtil elasticSearchUtil;
    private ElasticSearch7Helper elasticSearch7Helper;
    @ApiOperation("sql查询es")
    @RequestMapping(value = "/open/getEsInfoBySql", method = RequestMethod.GET)
@ -58,7 +51,7 @@ public class ElasticSearchController extends BaseController {
            return envelop;
        }
        try {
            List<Map<String, Object>> listData = elasticsearchUtil.excuteDataModel(sql);
            List<Map<String, Object>> listData = elasticsearchUtil.executeSQL(sql);
            envelop.setDetailModelList(listData);
            envelop.setObj(listData.size());
            envelop.setStatus(200);
@ -86,7 +79,7 @@ public class ElasticSearchController extends BaseController {
            if (StringUtils.isNotEmpty(whereCondition)) {
                sb.append(" where ").append(whereCondition).append(" limit 10000");
            }
            List<Map<String, Object>> listData = elasticsearchUtil.excuteDataModel(sb.toString());
            List<Map<String, Object>> listData = elasticsearchUtil.executeSQL(sb.toString());
            envelop.setDetailModelList(listData);
            envelop.setObj(listData.size());
            envelop.setStatus(200);
@ -154,7 +147,7 @@ public class ElasticSearchController extends BaseController {
    @RequestMapping(value = "/saveElasticsearchDocument", method = RequestMethod.POST)
    @ApiOperation("添加elasticsearch文档")
    public String saveDocument(
    public RestStatus saveDocument(
            @ApiParam(value = "json串")
            @RequestParam(value = "jsonString", required = true) String jsonString,
            @ApiParam(name = "index", value = "索引名称")
@ -162,23 +155,14 @@ public class ElasticSearchController extends BaseController {
            @ApiParam(name = "type", value = "类型(表)名称")
            @RequestParam(name = "type",required = true) String type
    ){
        boolean f = false;
        RestStatus f= RestStatus.valueOf("");
        try {
           String clusterName = elasticSearchConfig.getClusterName();
           String nodeStr = elasticSearchConfig.getClusterNodes();
            if(StringUtils.isNotEmpty(nodeStr)){
                String [] nodes = nodeStr.split(",");
                if(nodes.length > 1){
                    String ip = nodes[0].substring(0,nodes[0].indexOf(":")-1);
                    Client client = esClientUtil.getClient(ip, 9300, clusterName);
                    f = elasticsearchUtil.save(client,index,type,jsonString);
                    client.close();
                }
            }
            f = elasticSearch7Helper.save(index,jsonString);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return  String.valueOf(f);
        return f;
    }
    @RequestMapping(value = "/impFileDocument", method = RequestMethod.POST)
@ -191,18 +175,9 @@ public class ElasticSearchController extends BaseController {
            @ApiParam(name = "type", value = "类型(表)名称")
            @RequestParam(name = "type",required = true) String type
    )throws Exception {
        boolean f = false;
        RestStatus f= RestStatus.valueOf("");
        try {
            Client client = null ;
            String clusterName = elasticSearchConfig.getClusterName();
            String nodeStr = elasticSearchConfig.getClusterNodes();
            if(StringUtils.isNotEmpty(nodeStr)){
                String [] nodes = nodeStr.split(",");
                if(nodes.length > 1){
                    String ip = nodes[0].substring(0,nodes[0].indexOf(":")-1);
                    client = esClientUtil.getClient(ip, 9300, clusterName);
                }
            }
            if( !file.isEmpty()){
                FileInputStream fis = null;
                InputStreamReader isr = null;
@ -218,7 +193,7 @@ public class ElasticSearchController extends BaseController {
                        jsonString = str;
                        System.out.println(jsonString);// 打印
                        //添加到es库
                        f = elasticsearchUtil.save(client,index,type,jsonString);
                        f = elasticSearch7Helper.save(index,jsonString);
                    }
                } catch (FileNotFoundException e) {
                    System.out.println("找不到指定文件");
@ -255,16 +230,14 @@ public class ElasticSearchController extends BaseController {
        boolean f = false;
        try {
            Client client = null ;
            String clusterName = elasticSearchConfig.getClusterName();
            String nodeStr = elasticSearchConfig.getClusterNodes();
            if(StringUtils.isNotEmpty(nodeStr)){
      /*      if(StringUtils.isNotEmpty(nodeStr)){
                String [] nodes = nodeStr.split(",");
                if(nodes.length > 1){
                    String ip = nodes[0].substring(0,nodes[0].indexOf(":")-1);
                    client = esClientUtil.getClient(ip, 9300, clusterName);
                }
            }
            List<Map<String, Object>> list = elasticsearchUtil.queryList(client,index,type, null, null, 10000);
            }*/
            List<Map<String, Object>> list = elasticsearchUtil.queryList(index, null, null, 10000);
            byte[] buff = new byte[]{};
            StringBuffer docmBuff = new StringBuffer();
            for(Map<String, Object> map:list){
@ -311,20 +284,18 @@ public class ElasticSearchController extends BaseController {
        List<Map<String, Object>> list = null;
        try {
            Client client = null ;
            String clusterName = elasticSearchConfig.getClusterName();
            String nodeStr = elasticSearchConfig.getClusterNodes();
            if(StringUtils.isNotEmpty(nodeStr)){
           /* if(StringUtils.isNotEmpty(nodeStr)){
                String [] nodes = nodeStr.split(",");
                if(nodes.length > 1){
                    String ip = nodes[0].substring(0,nodes[0].indexOf(":")-1);
                    client = esClientUtil.getClient(ip, 9300, clusterName);
                }
            }
            }*/
            BoolQueryBuilder boolQueryBuilder =  QueryBuilders.boolQuery();
            TermQueryBuilder termQueryQuotaCode = QueryBuilders.termQuery(term, value);
            boolQueryBuilder.must(termQueryQuotaCode);
            list = elasticsearchUtil.queryList(client,index,type, boolQueryBuilder, null, 200);
            list = elasticsearchUtil.queryList(index, boolQueryBuilder, null, 200);
            client.close();
        } catch (Exception ex) {
            ex.printStackTrace();
@ -334,25 +305,16 @@ public class ElasticSearchController extends BaseController {
    @RequestMapping(value = "/elasticSearch/addElasticSearch", method = RequestMethod.POST)
    @ApiOperation("elasticsearch文档数据")
    public Boolean addElasticSearch(
    public String addElasticSearch(
            @ApiParam(name = "index", value = "索引名称")
            @RequestParam(value = "index") String index,
            @ApiParam(name = "type", value = "索引类型")
            @RequestParam(value = "type") String type,
            @ApiParam(name = "sourceList", value = "值")
            @RequestParam(value = "sourceList") String sourceList) throws Exception {
        boolean f = false;
        RestStatus f= RestStatus.valueOf("");
        try {
            Client client = null ;
            String clusterName = elasticSearchConfig.getClusterName();
            String nodeStr = elasticSearchConfig.getClusterNodes();
            if(StringUtils.isNotEmpty(nodeStr)){
                String [] nodes = nodeStr.split(",");
                if(nodes.length > 1){
                    String ip = nodes[0].substring(0,nodes[0].indexOf(":")-1);
                    client = esClientUtil.getClient(ip, 9300, clusterName);
                }
            }
            InputStream fis = null;
            InputStreamReader isr = null;
            BufferedReader br = null; //用于包装InputStreamReader,提高处理性能。因为BufferedReader有缓冲的,而InputStreamReader没有。
@ -368,7 +330,7 @@ public class ElasticSearchController extends BaseController {
                    jsonString = str;
                    System.out.println(jsonString);// 打印
                    //添加到es库
                    f = elasticsearchUtil.save(client,index,type,jsonString);
                    f = elasticSearch7Helper.save(index,jsonString);
                }
            } catch (FileNotFoundException e) {
                System.out.println("找不到指定文件");
@ -388,49 +350,49 @@ public class ElasticSearchController extends BaseController {
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return  f;
        return  String.valueOf(f);
    }
    @RequestMapping(value = "/elasticSearch/testQueryElasticSearch", method = RequestMethod.POST)
    @ApiOperation("测试查询数据")
    public void addElasticSearch(
            @ApiParam(name = "data", value = "参数")
            @RequestParam(value = "data") String data
    ){
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        String index = "singleDiseasePersonal";
        String type = "personal_info";
        int i =0;
        while(i<2){
            PersonalInfoModel personalInfo = new PersonalInfoModel();
            personalInfo.setDisease("HP0047");
            personalInfo.setDiseaseName("糖尿病");
            personalInfo.setDemographicId(data);
            try {
                String sql = "SELECT count(demographicId) FROM singleDiseasePersonal where demographicId ="+data+" group by demographicId ";
                long count2 = elasticsearchUtil.getCountBySql(sql);
                System.out.println("结果条数 count2="+count2);
//                List<Map<String, Object>> relist = elasticSearchUtil.findByField(index, type, "demographicId", data);
//                List<Map<String, Object>> filter = new ArrayList<>();
//                Map<String,Object> paramMap = new HashMap<>();
//                paramMap.put("demographicId",data);
//                filter.add(paramMap);
//               long count = elasticSearchUtil.count(index, type,filter);
//                System.out.println("结果条数 count="+count);
//                System.out.println("结果条数="+ relist.size());
//                if(relist== null || relist.size() ==0){
                if(count2 == 0){
                    Map<String, Object> source = new HashMap<>();
                    String jsonPer = objectMapper.writeValueAsString(personalInfo);
                    source = objectMapper.readValue(jsonPer, Map.class);
                    elasticSearchUtil.index(index,type, source);
                }
                i++;
            }catch (Exception e){
                e.getMessage();
            }
        }
    }
//    @RequestMapping(value = "/elasticSearch/testQueryElasticSearch", method = RequestMethod.POST)
//    @ApiOperation("测试查询数据")
//    public void addElasticSearch(
//            @ApiParam(name = "data", value = "参数")
//            @RequestParam(value = "data") String data
//    ){
//        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
//        String index = "singleDiseasePersonal";
//        String type = "personal_info";
//        int i =0;
//        while(i<2){
//            PersonalInfoModel personalInfo = new PersonalInfoModel();
//            personalInfo.setDisease("HP0047");
//            personalInfo.setDiseaseName("糖尿病");
//            personalInfo.setDemographicId(data);
//            try {
//                String sql = "SELECT count(demographicId) FROM singleDiseasePersonal where demographicId ="+data+" group by demographicId ";
//                long count2 = elasticsearchUtil.getCountBySql(sql);
//                System.out.println("结果条数 count2="+count2);
//
////                List<Map<String, Object>> relist = elasticSearchUtil.findByField(index, type, "demographicId", data);
////                List<Map<String, Object>> filter = new ArrayList<>();
////                Map<String,Object> paramMap = new HashMap<>();
////                paramMap.put("demographicId",data);
////                filter.add(paramMap);
////               long count = elasticSearchUtil.count(index, type,filter);
////                System.out.println("结果条数 count="+count);
////                System.out.println("结果条数="+ relist.size());
////                if(relist== null || relist.size() ==0){
//                if(count2 == 0){
//                    Map<String, Object> source = new HashMap<>();
//                    String jsonPer = objectMapper.writeValueAsString(personalInfo);
//                    source = objectMapper.readValue(jsonPer, Map.class);
//                    elasticSearchUtil.index(index,type, source);
//                }
//                i++;
//            }catch (Exception e){
//                e.getMessage();
//            }
//        }
//    }
}

+ 14 - 13
svr/svr-quota/src/main/java/com/yihu/jw/quota/controller/ElasticSearchEndPoint.java

@ -1,6 +1,6 @@
package com.yihu.jw.quota.controller;
import com.yihu.jw.elasticsearch.ElasticSearchUtil;
import com.yihu.jw.elasticsearch.ElasticSearch7Util;
import com.yihu.jw.restmodel.web.Envelop;
import com.yihu.jw.restmodel.web.endpoint.EnvelopRestEndpoint;
import com.yihu.jw.quota.constants.ServiceApi;
@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -31,7 +32,7 @@ import java.util.Map;
public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
    @Autowired
    private ElasticSearchUtil elasticSearchUtil;
    private ElasticSearch7Util elasticSearchUtil;
    @RequestMapping(value = ServiceApi.ElasticSearch.Mapping, method = RequestMethod.POST)
    @ApiOperation(value = "建立映射")
@ -49,7 +50,7 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
        if (setting != null) {
            _setting = objectMapper.readValue(setting, Map.class);
        }
        elasticSearchUtil.mapping(index, type, _mapping, _setting);
        elasticSearchUtil.mapping(index,_mapping, _setting);
        return success(true);
    }
@ -64,7 +65,7 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
            @RequestParam(value = "source") String source) throws Exception {
        Map<String, Object> result;
        Map<String, Object> sourceMap = objectMapper.readValue(source, Map.class);
        result = elasticSearchUtil.index(index, type, sourceMap);
        result = elasticSearchUtil.index(index, sourceMap);
        return success(result);
    }
@ -76,8 +77,8 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
            @ApiParam(name = "type", value = "索引类型", required = true)
            @RequestParam(value = "type") String type,
            @ApiParam(name = "id", value = "id(多个id值以,分隔)", required = true)
            @RequestParam(value = "id") String id) {
        elasticSearchUtil.bulkDelete(index, type, id.split(","));
            @RequestParam(value = "id") String id) throws IOException {
        elasticSearchUtil.bulkDelete(index, id.split(","));
        return success(true);
    }
@ -101,7 +102,7 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
            @ApiParam(name = "startDate", value = "时间字段开始时间,时间格式如:2018-01-12", required = false)
            @RequestParam(value = "startDate", required = false) String startDate,
            @ApiParam(name = "endDate", value = "时间字段结束时间,时间格式如:2018-01-12", required = false)
            @RequestParam(value = "endDate", required = false) String endDate) {
            @RequestParam(value = "endDate", required = false) String endDate) throws IOException {
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(field, value);
@ -116,7 +117,7 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
            RangeQueryBuilder rangeQueryEndTime = QueryBuilders.rangeQuery(dateField).lte(endDate);
            boolQueryBuilder.must(rangeQueryEndTime);
        }
        elasticSearchUtil.deleteByFilter(index, type, boolQueryBuilder);
        elasticSearchUtil.deleteByFilter(index, boolQueryBuilder);
        return success(true);
    }
@ -132,7 +133,7 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
            @ApiParam(name = "source", value = "值", required = true)
            @RequestParam(value = "source") String source) throws Exception {
        Map<String, Object> sourceMap = objectMapper.readValue(source, Map.class);
        Map<String, Object> result = elasticSearchUtil.update(index, type, id, sourceMap);
        Map<String, Object> result = elasticSearchUtil.update(index, id, sourceMap);
        if (result != null) {
            return success(result);
        }
@ -147,8 +148,8 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
            @ApiParam(name = "type", value = "索引类型", required = true)
            @RequestParam(value = "type") String type,
            @ApiParam(name = "id", value = "id", required = true)
            @PathVariable(value = "id") String id) {
        Map<String, Object> result = elasticSearchUtil.findById(index, type, id);
            @PathVariable(value = "id") String id) throws IOException {
        Map<String, Object> result = elasticSearchUtil.findById(index, id);
        return success(result);
    }
@ -162,8 +163,8 @@ public class ElasticSearchEndPoint extends EnvelopRestEndpoint {
            @ApiParam(name = "field", value = "字段", required = true)
            @RequestParam(value = "field") String field,
            @ApiParam(name = "value", value = "字段值", required = true)
            @RequestParam(value = "value") Object value) {
        List<Map<String, Object>> resultList = elasticSearchUtil.findByField(index, type, field, value);
            @RequestParam(value = "value") Object value) throws IOException {
        List<Map<String, Object>> resultList = elasticSearchUtil.findByField(index, field, value);
        return success(resultList);
    }

+ 2 - 2
svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/extract/ExtractPercentHelper.java

@ -1,6 +1,7 @@
package com.yihu.jw.quota.etl.extract;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.entity.ehr.quota.TjQuota;
import com.yihu.jw.entity.ehr.quota.TjQuotaDataSource;
import com.yihu.jw.entity.ehr.quota.TjQuotaDimensionMain;
@ -13,7 +14,6 @@ import com.yihu.jw.quota.service.quota.QuotaService;
import com.yihu.jw.quota.service.source.TjDataSourceService;
import com.yihu.jw.quota.etl.model.EsConfig;
import com.yihu.jw.quota.etl.util.ElasticsearchUtil;
import com.yihu.jw.quota.etl.util.EsClientUtil;
import com.yihu.jw.quota.vo.DictModel;
import com.yihu.jw.quota.vo.QuotaVo;
import com.yihu.jw.quota.vo.SaveModel;
@ -47,7 +47,7 @@ public class ExtractPercentHelper {
    @Autowired
    private ElasticsearchUtil elasticsearchUtil;
    @Autowired
    EsClientUtil esClientUtil;
    ElasticSearch7Helper esClientUtil;
    @Autowired
    ObjectMapper objectMapper;

+ 2 - 2
svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/extract/es/EsExtract.java

@ -1,5 +1,6 @@
package com.yihu.jw.quota.etl.extract.es;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.entity.ehr.quota.TjQuotaDimensionMain;
import com.yihu.jw.entity.ehr.quota.TjQuotaDimensionSlave;
import com.yihu.jw.restmodel.ehr.org.MOrganization;
@ -8,7 +9,6 @@ import com.yihu.jw.quota.etl.Contant;
import com.yihu.jw.quota.etl.extract.ExtractUtil;
import com.yihu.jw.quota.etl.model.EsConfig;
import com.yihu.jw.quota.etl.util.ElasticsearchUtil;
import com.yihu.jw.quota.etl.util.EsClientUtil;
import com.yihu.jw.quota.service.orgHealthCategory.OrgHealthCategoryStatisticsService;
import com.yihu.jw.quota.service.quota.BaseStatistsService;
import com.yihu.jw.quota.vo.QuotaVo;
@ -33,7 +33,7 @@ import java.util.*;
public class EsExtract {
    private Logger logger = LoggerFactory.getLogger(EsExtract.class);
    @Autowired
    private EsClientUtil esClientUtil;
    private ElasticSearch7Helper esClientUtil;
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired

+ 13 - 18
svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/extract/es/EsResultExtract.java

@ -2,7 +2,7 @@ package com.yihu.jw.quota.etl.extract.es;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.elasticsearch.ElasticSearchPool;
import com.yihu.jw.elasticsearch.ElasticSearch7Pool;
import com.yihu.jw.entity.ehr.quota.TjDataSave;
import com.yihu.jw.entity.ehr.quota.TjQuota;
import com.yihu.jw.entity.ehr.quota.TjQuotaDataSave;
@ -18,8 +18,8 @@ import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.InternalValueCount;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -57,7 +57,7 @@ public class EsResultExtract {
    @Autowired
    ElasticsearchUtil elasticsearchUtil;
    @Autowired
    ElasticSearchPool elasticSearchPool;
    ElasticSearch7Pool elasticSearchPool;
    @Autowired
    private TjDataSaveService tjDataSaveService;
    @Autowired
@ -146,10 +146,9 @@ public class EsResultExtract {
        initialize(tjQuota,filters);
        BoolQueryBuilder boolQueryBuilder =  QueryBuilders.boolQuery();
        getBoolQueryBuilder(boolQueryBuilder);
        TransportClient client = elasticSearchPool.getClient();
        List<Map<String, Object>> restltList = null;
        try {
            restltList =  elasticsearchUtil.queryPageList(client, esConfig.getIndex(), esConfig.getType(), boolQueryBuilder, pageNo, pageSize,"quotaDate");
            restltList =  elasticsearchUtil.queryPageList(esConfig.getIndex(),boolQueryBuilder, pageNo, pageSize,"quotaDate");
        } catch (Exception e){
            e.getMessage();
        }
@ -160,10 +159,9 @@ public class EsResultExtract {
        initialize(tjQuota,filters);
        BoolQueryBuilder boolQueryBuilder =  QueryBuilders.boolQuery();
        getBoolQueryBuilder(boolQueryBuilder);
        TransportClient  client = elasticSearchPool.getClient();
        int count  = 0;
        try {
            count  = (int)elasticsearchUtil.getTotalCount(client,esConfig.getIndex(),esConfig.getType(),boolQueryBuilder);
            count  = (int)elasticsearchUtil.getTotalCount(esConfig.getIndex(),boolQueryBuilder);
        }catch (Exception e){
            e.getMessage();
        }
@ -175,10 +173,9 @@ public class EsResultExtract {
        initialize(tjQuota,filters);
        BoolQueryBuilder boolQueryBuilder =  QueryBuilders.boolQuery();
        getBoolQueryBuilder(boolQueryBuilder);
        TransportClient  client = elasticSearchPool.getClient();
        List<Map<String, Object>> list = null;
        try {
           list = elasticsearchUtil.queryList(client,esConfig.getIndex(),esConfig.getType(),boolQueryBuilder, "quotaDate",size);
           list = elasticsearchUtil.queryList(esConfig.getIndex(),esConfig.getType(),boolQueryBuilder, "quotaDate",size);
        }catch (Exception e){
            e.getMessage();
        }
@ -215,7 +212,7 @@ public class EsResultExtract {
        if( !StringUtils.isEmpty(org) ){
            String [] orgvals =org.split(",");
            for(int i=0;i<orgvals.length ; i++){
                MatchQueryBuilder termOrg = QueryBuilders.matchPhraseQuery("org", orgvals[i]);
                QueryBuilder termOrg = QueryBuilders.matchPhraseQuery("org", orgvals[i]);
                qbChild.should(termOrg);
            }
            boolQueryBuilder.must(qbChild);
@ -275,21 +272,21 @@ public class EsResultExtract {
            Terms.Bucket b =  gradeBucketIt.next();
            if (b.getAggregations().asList().get(0) instanceof StringTerms) {
                StringTerms stringTermsCh = (StringTerms) b.getAggregations().asList().get(0);
                Iterator<Terms.Bucket> gradeBucketItCh = stringTermsCh.getBuckets().iterator();
                Iterator gradeBucketItCh = stringTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
                }
            }else if (b.getAggregations().asList().get(0) instanceof LongTerms) {
                LongTerms longTermsCh = (LongTerms) b.getAggregations().asList().get(0);
                Iterator<Terms.Bucket> gradeBucketItCh = longTermsCh.getBuckets().iterator();
                Iterator gradeBucketItCh = longTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
                }
            }else if (b.getAggregations().asList().get(0) instanceof DoubleTerms) {
                DoubleTerms doubleTermsCh = (DoubleTerms) b.getAggregations().asList().get(0);
                Iterator<Terms.Bucket> gradeBucketItCh = doubleTermsCh.getBuckets().iterator();
                Iterator gradeBucketItCh = doubleTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
@ -313,10 +310,9 @@ public class EsResultExtract {
        initialize(tjQuota,filters);
        BoolQueryBuilder boolQueryBuilder =  QueryBuilders.boolQuery();
        getBoolQueryBuilder(boolQueryBuilder);
        TransportClient client = elasticSearchPool.getClient();
        List<Map<String, Object>> list = null;
        try {
            list = elasticsearchUtil.searcherByGroup(client,esConfig.getIndex(),esConfig.getType(), boolQueryBuilder, aggsField, "result");
            list = elasticsearchUtil.searcherByGroup(esConfig.getIndex(), boolQueryBuilder, aggsField, "result");
        } catch (Exception e){
            e.getMessage();
        }
@ -331,10 +327,9 @@ public class EsResultExtract {
        }else {
            filter = filter + " and quotaCode='" + tjQuota.getCode().replaceAll("_","") + "' ";
        }
        TransportClient client = elasticSearchPool.getClient();
        Map<String, Integer> map = null;
        try {
            map = elasticsearchUtil.searcherSumByGroupBySql(client, esConfig.getIndex(), aggsFields, filter, sumField,orderFild,order);;
            map = elasticsearchUtil.searcherSumByGroupBySql(esConfig.getIndex(), aggsFields, filter, sumField,orderFild,order);;
        } catch (Exception e){
            e.getMessage();
        }

+ 17 - 23
svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/save/LargDataWithRunnable.java

@ -1,16 +1,17 @@
package com.yihu.jw.quota.etl.save;
import com.yihu.jw.quota.etl.model.EsConfig;
import com.yihu.jw.quota.etl.util.EsClientUtil;
import com.yihu.jw.quota.vo.SaveModel;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;
import net.sf.json.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@ -20,7 +21,8 @@ import java.util.List;
public class LargDataWithRunnable implements Runnable {
    private Logger logger = LoggerFactory.getLogger(LargDataWithRunnable.class);
    private EsClientUtil esClientUtil;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    private String jsonConfig;
    private List<SaveModel> list;//待处理数据
    private int threadCount = 0;//初始化线程数
@ -29,7 +31,7 @@ public class LargDataWithRunnable implements Runnable {
    private int totalCount = 0;//待处理数据总数量
    private int havedCount = 0;//已经处理的数据量
    public LargDataWithRunnable(List<SaveModel> saveModels, String jsonConfig , EsClientUtil esClientUtil){
    public LargDataWithRunnable(List<SaveModel> saveModels, String jsonConfig){
        this.list = saveModels;
        int count  = saveModels.size()/perCount;
        int remainder = saveModels.size()%perCount;
@ -39,7 +41,6 @@ public class LargDataWithRunnable implements Runnable {
        this.threadCount = count;
        this.totalCount = list.size();
        this.jsonConfig = jsonConfig;
        this.esClientUtil = esClientUtil;
    }
    @Override
@ -56,22 +57,22 @@ public class LargDataWithRunnable implements Runnable {
                }
                if(sublist != null) {
                    //此处为数据处理(简单打印 )
                    BulkResult br = null;
                    BulkResponse br = null;
                    boolean isSuccessed = false;
                    try {
                        //得到链接
                        EsConfig esConfig = (EsConfig) JSONObject.toBean(JSONObject.fromObject(jsonConfig), EsConfig.class);
                        JestClient jestClient = esClientUtil.getJestClient(esConfig.getHost(),esConfig.getPort());
                        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esConfig.getIndex()).defaultType(esConfig.getType());
                        BulkRequest bulkRequest = new BulkRequest();
                        for (SaveModel obj : sublist) {
                            obj.setCreateTime( new Date());
                            Index index = new Index.Builder(obj).build();
                            bulk.addAction(index);
                            IndexRequest indexRequest = new IndexRequest(esConfig.getIndex());
                            indexRequest.source(obj);
                            bulkRequest.add(indexRequest);
                        }
                        br = jestClient.execute(bulk.build());
                        br = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                        //关闭链接
                        jestClient.shutdownClient();
                        isSuccessed = br.isSucceeded();
                        restHighLevelClient.close();
                        isSuccessed = br.hasFailures();
                    }catch (Exception e){
                        throw new RuntimeException("ES 保存数据异常");
                    }
@ -138,11 +139,4 @@ public class LargDataWithRunnable implements Runnable {
        this.jsonConfig = jsonConfig;
    }
    public EsClientUtil getEsClientUtil() {
        return esClientUtil;
    }
    public void setEsClientUtil(EsClientUtil esClientUtil) {
        this.esClientUtil = esClientUtil;
    }
}

+ 24 - 20
svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/save/es/ElastricSearchSave.java

@ -3,19 +3,23 @@ package com.yihu.jw.quota.etl.save.es;
import com.yihu.jw.quota.etl.Contant;
import com.yihu.jw.quota.etl.model.EsConfig;
import com.yihu.jw.quota.etl.save.LargDataWithRunnable;
import com.yihu.jw.quota.etl.util.EsClientUtil;
import com.yihu.jw.quota.vo.SaveModel;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;
import net.sf.json.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@ -29,9 +33,9 @@ import java.util.List;
public class ElastricSearchSave {
    private Logger logger = LoggerFactory.getLogger(ElastricSearchSave.class);
    @Autowired
    private EsClientUtil esClientUtil;
    private EsConfig esConfig;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    public Boolean saveByMoreThred(List<SaveModel> saveModels, String jsonConfig) {
@ -58,7 +62,7 @@ public class ElastricSearchSave {
                    }
                    logger.info("data save 这是第" + (i+1) + "个线程;数据 = " + start+ " - " + end);
                    List<SaveModel> list = saveModels.subList(start, end);
                    LargDataWithRunnable dataWithRunnable = new LargDataWithRunnable(list,jsonConfig,esClientUtil);
                    LargDataWithRunnable dataWithRunnable = new LargDataWithRunnable(list,jsonConfig);
                    Thread thread = new Thread(dataWithRunnable);
                    thread.start();
                }
@ -72,7 +76,7 @@ public class ElastricSearchSave {
    }
    public Boolean save(List<SaveModel> smss, String jsonConfig) {
        BulkResult br = null;
        BulkResponse br = null;
        boolean isSuccessed = false;
        try {
            int perCount = Contant.compute.perCount;
@ -92,31 +96,31 @@ public class ElastricSearchSave {
                        newList = smss.subList(perCount*(i-1),perCount*i);
                    }
                    //得到链接
                    JestClient jestClient = esClientUtil.getJestClient(esConfig.getHost(),esConfig.getPort());
                    Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esConfig.getIndex()).defaultType(esConfig.getType());
                    BulkRequest bulkRequest = new BulkRequest();
                    for (SaveModel obj : newList) {
                        obj.setCreateTime( new Date());
                        Index index = new Index.Builder(obj).build();
                        bulk.addAction(index);
                        IndexRequest indexRequest = new IndexRequest(esConfig.getIndex());
                        indexRequest.source(obj);
                        bulkRequest.add(indexRequest);
                    }
                    br = jestClient.execute(bulk.build());
                    br = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT) ;
                    //关闭链接
                    jestClient.shutdownClient();
                    isSuccessed = br.isSucceeded();
                    restHighLevelClient.close();
                    isSuccessed = br.hasFailures();
                }
            }else{
                //得到链接
                JestClient jestClient = esClientUtil.getJestClient(esConfig.getHost(),esConfig.getPort());
                Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esConfig.getIndex()).defaultType(esConfig.getType());
                BulkRequest request = new BulkRequest();
                for (SaveModel obj : smss) {
                    obj.setCreateTime( new Date());
                    Index index = new Index.Builder(obj).build();
                    bulk.addAction(index);
                    IndexRequest indexRequest = new IndexRequest(esConfig.getIndex());
                    indexRequest.source(obj);
                    request.add(indexRequest);
                }
                br = jestClient.execute(bulk.build());
                br = restHighLevelClient.bulk(request,RequestOptions.DEFAULT);
                //关闭链接
                jestClient.shutdownClient();
                isSuccessed = br.isSucceeded();
                restHighLevelClient.close();
                isSuccessed = br.hasFailures();
            }
            return isSuccessed;
        } catch (Exception e) {

+ 75 - 71
svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/util/ElasticsearchUtil.java

@ -7,24 +7,29 @@ import com.alibaba.druid.sql.parser.SQLExprParser;
import com.alibaba.druid.sql.parser.Token;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.elasticsearch.ElasticSearchPool;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.InternalValueCount;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
@ -39,6 +44,8 @@ import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
/**
@ -49,8 +56,9 @@ public class ElasticsearchUtil {
    @Autowired
    ObjectMapper objectMapper;
    @Autowired
    private ElasticSearchPool elasticSearchPool;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    /**
@ -60,19 +68,17 @@ public class ElasticsearchUtil {
     * @param sortName 排序字段名称
     * @return
     */
    public List<Map<String, Object>> queryPageList(Client client, String index, String type, BoolQueryBuilder boolQueryBuilder,
                                                   int pageNo, int pageSize, String sortName){
        SearchResponse actionGet = null;
    public List<Map<String, Object>> queryPageList(String index, BoolQueryBuilder boolQueryBuilder,
                                                   int pageNo, int pageSize, String sortName) throws IOException {
        SortBuilder dealSorter = SortBuilders.fieldSort(sortName).order(SortOrder.DESC);
        actionGet = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(boolQueryBuilder)
                .setFrom(pageNo - 1).setSize(pageSize).addSort(dealSorter)//从0开始算
                .execute().actionGet();
        SearchHits hits = actionGet.getHits();
        SearchRequest request = new SearchRequest(index);
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.query(boolQueryBuilder).sort(dealSorter).from(pageNo - 1).size(pageSize);
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        List<Map<String, Object>> matchRsult = new LinkedList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()){
            matchRsult.add(hit.getSource());
            matchRsult.add(hit.getSourceAsMap());
        }
        return matchRsult;
    }
@ -81,15 +87,15 @@ public class ElasticsearchUtil {
     * @param boolQueryBuilder  查询参数 build
     * @return
     */
    public long getTotalCount(Client client,String index,String type,BoolQueryBuilder boolQueryBuilder){
        SearchResponse actionGet = null;
        actionGet = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(boolQueryBuilder)
                .execute().actionGet();
        SearchHits hits = actionGet.getHits();
    public long getTotalCount(String index,BoolQueryBuilder boolQueryBuilder) throws IOException {
        SearchRequest request = new SearchRequest(index);
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        searchBuilder.query(boolQueryBuilder);
        request.source(searchBuilder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
       if(hits != null){
           return hits.totalHits();
           return hits.getTotalHits().value;
       }
        return 0;
    }
@ -99,7 +105,7 @@ public class ElasticsearchUtil {
     * @param sortName 排序字段名称
     * @return
     */
    public List<Map<String, Object>> queryList(Client client,String index,String type,BoolQueryBuilder boolQueryBuilder,String sortName,int size){
    public List<Map<String, Object>> queryList(String index,String type,BoolQueryBuilder boolQueryBuilder,String sortName,int size) throws IOException {
        SearchResponse actionGet = null;
        SortBuilder dealSorter = null;
        if(sortName != null){
@ -107,17 +113,13 @@ public class ElasticsearchUtil {
        }else{
            dealSorter = SortBuilders.fieldSort("_id").order(SortOrder.DESC);
        }
        actionGet = client.prepareSearch(index)
                .setTypes(type)
                .setSize(size)
                .setQuery(boolQueryBuilder)
                .addSort(dealSorter)
                .execute().actionGet();
        SearchRequest searchRequest = new SearchRequest(index);
        actionGet = restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT);
        SearchHits hits = actionGet.getHits();
        List<Map<String, Object>> matchRsult = new LinkedList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()){
            Map<String, Object> map = new HashMap<>() ;
            map = hit.getSource();
            map = hit.getSourceAsMap();
            map.put("id",hit.getId());
            matchRsult.add(map);
        }
@ -132,24 +134,25 @@ public class ElasticsearchUtil {
     * @param sumField 要求和的字段  只支持一个字段
     * @return
     */
    public List<Map<String, Object>> searcherByGroup(Client client, String index, String type, BoolQueryBuilder queryBuilder, String aggsField , String sumField) {
    public List<Map<String, Object>> searcherByGroup( String index, BoolQueryBuilder queryBuilder, String aggsField , String sumField) throws IOException {
        List<Map<String, Object>> list = new ArrayList<>();
        SearchRequestBuilder searchRequestBuilder =
                client.prepareSearch(index)
                .setTypes(type)
                .setQuery(queryBuilder);
        SearchSourceBuilder builder =new  SearchSourceBuilder();
        builder.query(queryBuilder);
        SearchRequest request =new SearchRequest(index);
        //创建TermsBuilder对象,使用term查询,设置该分组的名称为 name_count,并根据aggsField字段进行分组
        TermsBuilder termsBuilder = AggregationBuilders.terms(aggsField+"_val").field(aggsField);
        SumBuilder ageAgg = AggregationBuilders.sum(sumField+"_count").field(sumField);
        searchRequestBuilder.addAggregation(termsBuilder.subAggregation(ageAgg));
        TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(aggsField+"_val").field(aggsField);
        SumAggregationBuilder ageAgg = AggregationBuilders.sum(sumField+"_count").field(sumField);
        builder.aggregation(termsBuilder.subAggregation(ageAgg));
        request.source(builder);
        Map<String, Object> dataMap = new HashMap<String, Object>();
        //执行搜索
        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
        SearchResponse searchResponse = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        //解析返回数据,获取分组名称为aggs-class的数据
        Terms terms = searchResponse.getAggregations().get(aggsField+"_val");
        Collection<Terms.Bucket> buckets = terms.getBuckets();
        Collection<Terms.Bucket> buckets = (Collection<Terms.Bucket>) terms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKey().toString();
            if (bucket.getAggregations().asList().get(0) instanceof InternalSum) {//(sum(xx))
@ -173,9 +176,9 @@ public class ElasticsearchUtil {
     * @param order 排序 asc,desc
     * @return
     */
    public Map<String, Integer> searcherSumByGroupBySql(Client client,String index, String aggsFields ,String filter , String sumField,String orderFild,String order) throws Exception {
    public Map<String, Integer> searcherSumByGroupBySql(String index, String aggsFields ,String filter , String sumField,String orderFild,String order) throws Exception {
        Map<String,Integer> map = new LinkedHashMap<>();
        Client client = (Client) restHighLevelClient.getLowLevelClient();
//       String mysql1 = "select org ,sum(result) from quota where quotaCode='depart_treat_count' group by org  ";id=16
        StringBuffer mysql = new StringBuffer("select ");
        mysql.append(aggsFields)
@ -212,7 +215,7 @@ public class ElasticsearchUtil {
        //之后就是对ES的操作
        SearchResponse response = (SearchResponse) requestBuilder.get();
        StringTerms stringTerms = (StringTerms) response.getAggregations().asList().get(0);
        Iterator<Terms.Bucket> gradeBucketIt = stringTerms.getBuckets().iterator();
        Iterator gradeBucketIt = stringTerms.getBuckets().iterator();
        //里面存放的数据 例  350200-5-2-2    主维度  细维度1  细维度2  值
        //递归解析json
        expainJson(gradeBucketIt, map, null);
@ -227,11 +230,11 @@ public class ElasticsearchUtil {
     * @return
     * @throws JsonProcessingException
     */
    public boolean save(Client client,String index,String type,String source) throws JsonProcessingException {
        IndexResponse indexResponse = client
                .prepareIndex(index, type, null)
                .setSource(source).get();
        boolean result =  indexResponse.isCreated();
    public boolean save(Client client,String index,String type,String source) throws IOException {
        IndexRequest request = new IndexRequest(index);
        request.source(source);
        IndexResponse indexResponse = restHighLevelClient.index(request,RequestOptions.DEFAULT);
        boolean result =  indexResponse.isFragment();
        return result;
    }
@ -239,24 +242,25 @@ public class ElasticsearchUtil {
     * 查询后 存在 删除
     * @param boolQueryBuilder
     */
    public synchronized  boolean queryDelete(Client client,String index,String type,BoolQueryBuilder boolQueryBuilder){
        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
    public synchronized  boolean queryDelete(String index,BoolQueryBuilder boolQueryBuilder) throws IOException {
        BulkRequest builder = new BulkRequest();
        DeleteRequestBuilder deleteRequestBuilder = null ;
        SearchResponse actionGet = null;
        actionGet = client.prepareSearch(index)
                .setTypes(type)
                .setSize(10000)
                .setQuery(boolQueryBuilder)
                .execute().actionGet();
        SearchHits hits = actionGet.getHits();
        SearchRequest request = new SearchRequest(index);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.size(10000);
        sourceBuilder.query(boolQueryBuilder);
        request.source(sourceBuilder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        for (SearchHit hit : hits.getHits()){
            deleteRequestBuilder = client.prepareDelete(index, type, hit.getId());
            bulkRequestBuilder.add(deleteRequestBuilder.request());
            DeleteRequest deleteRequest = new DeleteRequest(index);
            deleteRequest.id(hit.getId());
            builder.add(deleteRequest);
        }
        //进行批量删除操作
        boolean optFlag = true;
        if(hits.getHits() != null && hits.getHits().length > 0){
            BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
            BulkResponse bulkResponse = restHighLevelClient.bulk(builder,RequestOptions.DEFAULT);
            if (bulkResponse.hasFailures()) {
                optFlag = false;
            }else {
@ -278,21 +282,21 @@ public class ElasticsearchUtil {
            Terms.Bucket b =  gradeBucketIt.next();
            if (b.getAggregations().asList().get(0) instanceof StringTerms) {
                StringTerms stringTermsCh = (StringTerms) b.getAggregations().asList().get(0);
                Iterator<Terms.Bucket> gradeBucketItCh = stringTermsCh.getBuckets().iterator();
                Iterator gradeBucketItCh = stringTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
                }
            }else if (b.getAggregations().asList().get(0) instanceof LongTerms) {
                LongTerms longTermsCh = (LongTerms) b.getAggregations().asList().get(0);
                Iterator<Terms.Bucket> gradeBucketItCh = longTermsCh.getBuckets().iterator();
                Iterator gradeBucketItCh = longTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
                }
            }else if (b.getAggregations().asList().get(0) instanceof DoubleTerms) {
                DoubleTerms doubleTermsCh = (DoubleTerms) b.getAggregations().asList().get(0);
                Iterator<Terms.Bucket> gradeBucketItCh = doubleTermsCh.getBuckets().iterator();
                Iterator gradeBucketItCh =doubleTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
@ -316,7 +320,7 @@ public class ElasticsearchUtil {
     */
    public List<Map<String, Object>> excuteDataModel(String sql) {
        List<Map<String, Object>> returnModels = new ArrayList<>();
        TransportClient client = elasticSearchPool.getClient();
        Client client = (Client) restHighLevelClient.getLowLevelClient();
        try {
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
@ -344,7 +348,7 @@ public class ElasticsearchUtil {
            } else {
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            temp.getLines().stream().forEach(one -> {
                try {
@ -367,8 +371,8 @@ public class ElasticsearchUtil {
    }
    public long getCountBySql(String sql) {
        TransportClient client = elasticSearchPool.getClient();
        try {
            Client client = (Client) restHighLevelClient.getLowLevelClient();
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
            SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
@ -391,7 +395,7 @@ public class ElasticsearchUtil {
            SearchResponse response = (SearchResponse) requestBuilder.get();
            SearchHits hits = response.getHits();
            if(hits != null){
                return hits.totalHits();
                return hits.getTotalHits().value;
            }
            return 0;
        } catch (Exception e) {

+ 59 - 59
svr/svr-quota/src/main/java/com/yihu/jw/quota/etl/util/EsClientUtil.java

@ -1,59 +1,59 @@
package com.yihu.jw.quota.etl.util;
import com.yihu.jw.quota.etl.extract.es.EsExtract;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.net.InetAddress;
/**
* Created by janseny on 2017/8/1
 */
@Component
public class EsClientUtil {
    private Logger logger = LoggerFactory.getLogger(EsExtract.class);
    /**
     * @param host "localhost"
     * @param port 9200
     * @return
     */
    public JestClient getJestClient(String host, Integer port) {
        String hostAddress="http://"+host+":"+port;
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig
                .Builder(hostAddress)
                .multiThreaded(true)
                //.discoveryEnabled(true)
                .readTimeout(60000)//30秒 -60s
                .build());
        return factory.getObject();
    }
    public Client getClient(String host, Integer port,String clusterName) {
        try {
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", StringUtils.isEmpty(clusterName)?"elasticsearch":clusterName)
                    .put("client.transport.sniff", false)
                    .build();
            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
            return client;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}
//package com.yihu.jw.quota.etl.util;
//
//
//
//import com.yihu.jw.quota.etl.extract.es.EsExtract;
//import io.searchbox.client.JestClient;
//import io.searchbox.client.JestClientFactory;
//import io.searchbox.client.config.HttpClientConfig;
//import org.elasticsearch.client.Client;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.stereotype.Component;
//import org.springframework.util.StringUtils;
//
//import java.net.InetAddress;
//
///**
//* Created by janseny on 2017/8/1
// */
//@Component
//public class EsClientUtil {
//    private Logger logger = LoggerFactory.getLogger(EsExtract.class);
//
//    /**
//     * @param host "localhost"
//     * @param port 9200
//     * @return
//     */
//    public JestClient getJestClient(String host, Integer port) {
//        String hostAddress="http://"+host+":"+port;
//        JestClientFactory factory = new JestClientFactory();
//        factory.setHttpClientConfig(new HttpClientConfig
//                .Builder(hostAddress)
//                .multiThreaded(true)
//                //.discoveryEnabled(true)
//                .readTimeout(60000)//30秒 -60s
//                .build());
//        return factory.getObject();
//    }
//
//    public Client getClient(String host, Integer port,String clusterName) {
//        try {
//            Settings settings = Settings.settingsBuilder()
//                    .put("cluster.name", StringUtils.isEmpty(clusterName)?"elasticsearch":clusterName)
//                    .put("client.transport.sniff", false)
//                    .build();
//            Client client = TransportClient.builder().settings(settings).build()
//                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
//            return client;
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//        return null;
//    }
//
//}

+ 2 - 9
svr/svr-quota/src/main/java/com/yihu/jw/quota/job/EsQuotaJob.java

@ -1,6 +1,5 @@
package com.yihu.jw.quota.job;
import com.yihu.jw.elasticsearch.ElasticSearchPool;
import com.yihu.jw.entity.ehr.quota.TjQuotaDataSource;
import com.yihu.jw.entity.ehr.quota.TjQuotaLog;
import com.yihu.jw.quota.dao.jpa.TjQuotaLogDao;
@ -58,8 +57,6 @@ public class EsQuotaJob implements Job {
    @Autowired
    ElasticsearchUtil elasticsearchUtil;
    @Autowired
    private ElasticSearchPool elasticSearchPool;
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
@ -250,13 +247,11 @@ public class EsQuotaJob implements Job {
        boolQueryBuilder.filter(qb);
        boolean flag = true ;
        Client talClient = elasticSearchPool.getClient();
        Client client = elasticSearchPool.getClient();
        try {
            while (flag){
                long count = elasticsearchUtil.getTotalCount(talClient, esConfig.getIndex() ,esConfig.getType(), boolQueryBuilder);
                long count = elasticsearchUtil.getTotalCount(esConfig.getIndex() , boolQueryBuilder);
                if(count != 0){
                    boolean successFlag = elasticsearchUtil.queryDelete(client, esConfig.getIndex() ,esConfig.getType(),boolQueryBuilder);
                    boolean successFlag = elasticsearchUtil.queryDelete(esConfig.getIndex() ,boolQueryBuilder);
                    if(!successFlag){
                        throw  new Exception("Elasticsearch 指标统计时原始数据删除失败");
                    }
@ -268,8 +263,6 @@ public class EsQuotaJob implements Job {
            e.printStackTrace();
            throw  new Exception("Elasticsearch 指标统计时原始数据删除异常");
        } finally {
            talClient.close();
            client.close();
            logger.debug(quotaVo.getCode()+" delete success");
        }
    }

+ 2 - 7
svr/svr-quota/src/main/java/com/yihu/jw/quota/job/EsQuotaPercentJob.java

@ -7,7 +7,6 @@ import com.yihu.jw.quota.etl.extract.ExtractPercentHelper;
import com.yihu.jw.quota.etl.model.EsConfig;
import com.yihu.jw.quota.etl.save.SaveHelper;
import com.yihu.jw.quota.etl.util.ElasticsearchUtil;
import com.yihu.jw.quota.etl.util.EsClientUtil;
import com.yihu.jw.quota.etl.Contant;
import com.yihu.jw.quota.util.SpringUtil;
import com.yihu.jw.quota.vo.QuotaVo;
@ -51,8 +50,6 @@ public class EsQuotaPercentJob implements Job {
    @Autowired
    private TjQuotaLogDao tjQuotaLogDao;
    @Autowired
    private EsClientUtil esClientUtil;
    @Autowired
    private ExtractHelper extractHelper;
    @Autowired
    ElasticsearchUtil elasticsearchUtil;
@ -102,13 +99,11 @@ public class EsQuotaPercentJob implements Job {
                    RangeQueryBuilder rangeQueryEndTime = QueryBuilders.rangeQuery("quotaDate").lte(endTime);
                    boolQueryBuilder.must(rangeQueryEndTime);
                }
                Client client = esClientUtil.getClient(esConfig.getHost(), esConfig.getPort(), esConfig.getClusterName());
                try {
                    elasticsearchUtil.queryDelete(client,esConfig.getIndex(),esConfig.getType(),boolQueryBuilder);
                    elasticsearchUtil.queryDelete(esConfig.getIndex(),boolQueryBuilder);
                }catch (Exception e){
                    e.getMessage();
                }finally {
                    client.close();
                }
                List<SaveModel> dataSaveModels = new ArrayList<>();
                for(SaveModel saveModel :dataModels){
@ -171,7 +166,7 @@ public class EsQuotaPercentJob implements Job {
    }
    @Transactional
    private void saveLog(TjQuotaLog tjQuotaLog) {
    void saveLog(TjQuotaLog tjQuotaLog) {
        tjQuotaLogDao.save(tjQuotaLog);
    }

+ 12 - 0
svr/svr-quota/src/main/resources/application.yml

@ -99,6 +99,18 @@ spring:
elasticsearch:
  cluster-name: elasticsearch
  cluster-nodes: 172.26.0.34:9300
es:
  pwflag: 1 # 1需要密码,2不需要密码
  index:
    Statistics: hlw_quota_test
  type:
    Statistics: hlw_quota_test
  host:  http://172.26.0.34:9200
  tHost: 172.26.0.34:9300
  clusterName: jkzl
  securityUser: elastic:elastic
  user: lion
  password: jkzlehr
hadoop:
  hbase-properties:
    hbase.zookeeper.quorum: node1.hde.h3c.com,node2.hde.h3c.com,node3.hde.h3c.com