Browse Source

代码修改

LAPTOP-KB9HII50\70708 2 years ago
parent
commit
bcc223d79a

+ 18 - 13
business/es-service/pom.xml

@ -47,10 +47,15 @@
        </dependency>
        <!--elasticsearch start-->
        <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
            <version>${version.elasticsearch-sql}</version>
            <groupId>com.yihu.jw</groupId>
            <artifactId>elasticsearch-starter</artifactId>
            <version>${version.wlyy-common}</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.nlpcn</groupId>-->
<!--            <artifactId>elasticsearch-sql</artifactId>-->
<!--            <version>${version.elasticsearch-sql}</version>-->
<!--        </dependency>-->
        <!--elasticsearch start-->
        <dependency>
            <groupId>commons-collections</groupId>
@ -62,16 +67,16 @@
            <artifactId>jest</artifactId>
            <version>${version.jest}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${version.elasticsearch}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>shield</artifactId>
            <version>${version.elasticsearch}</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.elasticsearch</groupId>-->
<!--            <artifactId>elasticsearch</artifactId>-->
<!--            <version>${version.elasticsearch}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.elasticsearch.plugin</groupId>-->
<!--            <artifactId>shield</artifactId>-->
<!--            <version>${version.elasticsearch}</version>-->
<!--        </dependency>-->
        <!--   poi xml导入导出工具 end -->
    </dependencies>

+ 154 - 154
business/es-service/src/main/java/com/yihu/jw/es/es/ElasticFactory.java

@ -1,154 +1,154 @@
package com.yihu.jw.es.es;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.shield.ShieldPlugin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
 * 中山医院版本
 * Created by chenweida on 2017/6/5.
 */
@Component
public class ElasticFactory {
    private static JestClientFactory factory = null;
    @Value("${es.host}")
    private String esHost;//http://59.61.92.90:9065,http://59.61.92.90:9067
    @Value("${es.tHost}")
    private String tHost;// 59.61.92.90:9066,59.61.92.90:9068
    @Value("${es.clusterName}")
    private String clusterName;
    @Value("${es.securityUser}")
    private String securityUser;
    @Value("${es.pwflag}")
    private String pwflag;
    @Value("${es.user}")
    private String user;
    @Value("${es.password}")
    private String password;
//-----------------------------------jestClient----------------------------------------
    /**
     * @param "http://localhost:9200"
     * @return
     */
    public JestClient getJestClient() {
        if (factory == null) {
            //初始化链接
            init();
        }
        return factory.getObject();
    }
    /**
     * 初始化链接
     * 9200
     */
    public synchronized void init() {
        String[] hostArray = esHost.split(",");
        // Construct a new Jest client according to configuration via factory
        factory = new JestClientFactory();
        HttpClientConfig httpClientConfig = new HttpClientConfig
                .Builder(Arrays.asList(hostArray))
                .multiThreaded(true)
                .defaultCredentials(user,password)
                .maxTotalConnection(50)// 最大链接
                .maxConnectionIdleTime(10, TimeUnit.MINUTES)//链接等待时间
                .connTimeout(60 * 1000*10)
                // .discoveryEnabled(true)
                .readTimeout(60 * 1000*10)//60秒
                .build();
        factory.setHttpClientConfig(httpClientConfig);//得到链接
    }
    //-----------------------------------TransportClient----------------------------------------
    private TransportClient transportClient;
    public Client getTransportClient() {
        try {
            //1需要加密初始化
            if("1".equals(pwflag)){
                initTranClientPw();
            }else {
                initTranClient();
            }
            return transportClient;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 9300
     * 互联网医院版本需要密码
     * @throws UnknownHostException
     */
    private synchronized void initTranClientPw() throws Exception {
        if (transportClient == null) {
            String[] hosts = tHost.split(",");
            Settings settings = getSettings();
            transportClient = TransportClient.builder().addPlugin(ShieldPlugin.class).settings(settings).build();
            for (String oneHost : hosts) {
                String[] hostAndport = oneHost.split(":");
                transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostAndport[0]), Integer.valueOf(hostAndport[1])));
            }
        }
    }
    /**
     * 9300
     *
     * @throws UnknownHostException
     */
    private synchronized void initTranClient() throws UnknownHostException {
        if (transportClient == null) {
            String[] hosts = tHost.split(",");
            Settings settings = Settings.settingsBuilder()
                    // .put("client.transport.sniff", true)//开启嗅探功能
                    .put("cluster.name", StringUtils.isEmpty(clusterName) ? "jkzl" : clusterName)//默认集群名字是jkzl
                    .build();
            transportClient = TransportClient.builder().settings(settings).build();
            for (String oneHost : hosts) {
                String[] hostAndport = oneHost.split(":");
                transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostAndport[0]), Integer.valueOf(hostAndport[1])));
            }
        }
    }
    /**
     * 配置连接
     * @return
     * @throws Exception
     */
    private Settings getSettings() throws Exception {
        Settings.Builder settingBuilder = Settings.settingsBuilder();
        settingBuilder.put("cluster.name", clusterName);
        settingBuilder.put("shield.user", securityUser);
        settingBuilder.put("client.transport.sniff", false);
        settingBuilder.put("transport.address.list", tHost);
        return settingBuilder.build();
    }
}
//package com.yihu.jw.es.es;
//
//import io.searchbox.client.JestClient;
//
//import io.searchbox.client.JestClientFactory;
//import io.searchbox.client.config.HttpClientConfig;
//import org.elasticsearch.client.Client;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;
//import org.elasticsearch.shield.ShieldPlugin;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.stereotype.Component;
//import org.springframework.util.StringUtils;
//
//import java.net.InetAddress;
//import java.net.UnknownHostException;
//import java.util.Arrays;
//import java.util.concurrent.TimeUnit;
//
///**
// * 中山医院版本
// * Created by chenweida on 2017/6/5.
// */
//
//@Component
//public class ElasticFactory {
//    private static JestClientFactory factory = null;
//
//    @Value("${es.host}")
//    private String esHost;//http://59.61.92.90:9065,http://59.61.92.90:9067
//    @Value("${es.tHost}")
//    private String tHost;// 59.61.92.90:9066,59.61.92.90:9068
//    @Value("${es.clusterName}")
//    private String clusterName;
//    @Value("${es.securityUser}")
//    private String securityUser;
//    @Value("${es.pwflag}")
//    private String pwflag;
//    @Value("${es.user}")
//    private String user;
//    @Value("${es.password}")
//    private String password;
////-----------------------------------jestClient----------------------------------------
//
//    /**
//     * @param "http://localhost:9200"
//     * @return
//     */
//    public JestClient getJestClient() {
//        if (factory == null) {
//            //初始化链接
//            init();
//        }
//        return factory.getObject();
//    }
//
//    /**
//     * 初始化链接
//     * 9200
//     */
//    public synchronized void init() {
//        String[] hostArray = esHost.split(",");
//        // Construct a new Jest client according to configuration via factory
//        factory = new JestClientFactory();
//        HttpClientConfig httpClientConfig = new HttpClientConfig
//                .Builder(Arrays.asList(hostArray))
//                .multiThreaded(true)
//                .defaultCredentials(user,password)
//                .maxTotalConnection(50)// 最大链接
//                .maxConnectionIdleTime(10, TimeUnit.MINUTES)//链接等待时间
//                .connTimeout(60 * 1000*10)
//                // .discoveryEnabled(true)
//                .readTimeout(60 * 1000*10)//60秒
//                .build();
//
//
//        factory.setHttpClientConfig(httpClientConfig);//得到链接
//    }
//
//    //-----------------------------------TransportClient----------------------------------------
//    private TransportClient transportClient;
//
//    public Client getTransportClient() {
//        try {
//            //1需要加密初始化
//            if("1".equals(pwflag)){
//                initTranClientPw();
//            }else {
//                initTranClient();
//            }
//            return transportClient;
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//        return null;
//    }
//
//    /**
//     * 9300
//     * 互联网医院版本需要密码
//     * @throws UnknownHostException
//     */
//    private synchronized void initTranClientPw() throws Exception {
//        if (transportClient == null) {
//            String[] hosts = tHost.split(",");
//            Settings settings = getSettings();
//            transportClient = TransportClient.builder().addPlugin(ShieldPlugin.class).settings(settings).build();
//            for (String oneHost : hosts) {
//                String[] hostAndport = oneHost.split(":");
//                transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostAndport[0]), Integer.valueOf(hostAndport[1])));
//            }
//        }
//    }
//
//    /**
//     * 9300
//     *
//     * @throws UnknownHostException
//     */
//    private synchronized void initTranClient() throws UnknownHostException {
//        if (transportClient == null) {
//            String[] hosts = tHost.split(",");
//            Settings settings = Settings.settingsBuilder()
//                    // .put("client.transport.sniff", true)//开启嗅探功能
//                    .put("cluster.name", StringUtils.isEmpty(clusterName) ? "jkzl" : clusterName)//默认集群名字是jkzl
//                    .build();
//
//            transportClient = TransportClient.builder().settings(settings).build();
//
//            for (String oneHost : hosts) {
//                String[] hostAndport = oneHost.split(":");
//                transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostAndport[0]), Integer.valueOf(hostAndport[1])));
//            }
//        }
//    }
//
//
//    /**
//     * 配置连接
//     * @return
//     * @throws Exception
//     */
//    private Settings getSettings() throws Exception {
//        Settings.Builder settingBuilder = Settings.settingsBuilder();
//        settingBuilder.put("cluster.name", clusterName);
//        settingBuilder.put("shield.user", securityUser);
//        settingBuilder.put("client.transport.sniff", false);
//        settingBuilder.put("transport.address.list", tHost);
//        return settingBuilder.build();
//    }
//
//}
//

+ 322 - 322
business/es-service/src/main/java/com/yihu/jw/es/es/ElasticSearchHelperUtil.java

@ -1,323 +1,323 @@
package com.yihu.jw.es.es;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.parser.SQLExprParser;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import org.apache.commons.collections.CollectionUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.nlpcn.es4sql.domain.Select;
import org.nlpcn.es4sql.jdbc.ObjectResult;
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
import org.nlpcn.es4sql.parse.SqlParser;
import org.nlpcn.es4sql.query.AggregationQueryAction;
import org.nlpcn.es4sql.query.DefaultQueryAction;
import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.DecimalFormat;
import java.util.*;
;
/**
 * Created by chenweida on 2017/6/2.
 */
@Component
@Scope("prototype")
public class ElasticSearchHelperUtil {
    private Logger logger = LoggerFactory.getLogger(ElasticSearchHelperUtil.class);
    @Autowired
    private ElasticFactory elasticFactory;
    private JestClient jestClient;
    @Value("${es.host}")
    private String esHost;
    @Autowired
    private ObjectMapper objectMapper;
    private static String[] curlCmds = new String[6];
    @PostConstruct
    public void init() {
        jestClient = elasticFactory.getJestClient();
        // curl命令格式
        String url = esHost.split(",")[0].replace("http://", "").concat("/_sql");
        curlCmds[0] = "curl";
        curlCmds[1] = url;
        curlCmds[2] = "-H";
        curlCmds[3] = "Content-Type: application/json";
        curlCmds[4] = "-d";
    }
    public <T> Boolean save(String index, String type, List<T> sources) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        sources.forEach((item) -> {
            Index indexObj = (new io.searchbox.core.Index.Builder(item)).build();
            bulk.addAction(indexObj);
        });
        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
        this.logger.debug("save flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    public Boolean save(String index, String type, String source,String errMsg) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        Index indexObj = (new io.searchbox.core.Index.Builder(source)).build();
        bulk.addAction(indexObj);
        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
        if(!br.isSucceeded()){
            this.logger.error("save flag: " + br.isSucceeded());
            errMsg = br.getErrorMessage();
        }
        return br.isSucceeded();
    }
    public Boolean saveWithCustomId(String index, String type, String source, String idFieldString,StringBuilder errMsg) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        JSONObject jsonObject = (JSONObject)(JSONObject.parse(source));
        Index indexObj = ((io.searchbox.core.Index.Builder)(new io.searchbox.core.Index.Builder(source)).id(jsonObject.getString(idFieldString))).build();
        bulk.addAction(indexObj);
        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
        if(!br.isSucceeded()){
            errMsg.append(br.getJsonString());
            this.logger.error("save flag: " + br.isSucceeded() + "  " + errMsg.toString());
        }
        return br.isSucceeded();
    }
    public Boolean saveBulkWithCustomId(String index, String type, List<String> sources, String idFieldString) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        sources.forEach((item) -> {
            JSONObject jsonObject = (JSONObject)((JSONObject)JSONObject.parse(item));
            Index indexObj = ((io.searchbox.core.Index.Builder)(new io.searchbox.core.Index.Builder(item)).id(jsonObject.getString(idFieldString))).build();
            bulk.addAction(indexObj);
        });
        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
        this.logger.debug("save flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    public <T> Boolean update(String index, String type, List<T> sources) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        sources.forEach((item) -> {
            JSONObject jo = new JSONObject();
            jo.put("doc", item);
//            Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((Builder)(new Builder(jo.toString())).index(index)).type(type)).id((item).getId())).build();
//package com.yihu.jw.es.es;
//
//import com.alibaba.druid.sql.ast.SQLExpr;
//import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
//import com.alibaba.druid.sql.parser.SQLExprParser;
//import com.alibaba.fastjson.JSONObject;
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import io.searchbox.client.JestClient;
//import io.searchbox.client.JestResult;
//import io.searchbox.core.*;
//import org.apache.commons.collections.CollectionUtils;
//import org.elasticsearch.action.search.SearchResponse;
//import org.elasticsearch.client.Client;
//import org.nlpcn.es4sql.domain.Select;
//import org.nlpcn.es4sql.jdbc.ObjectResult;
//import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
//import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
//import org.nlpcn.es4sql.parse.SqlParser;
//import org.nlpcn.es4sql.query.AggregationQueryAction;
//import org.nlpcn.es4sql.query.DefaultQueryAction;
//import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Scope;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//import java.io.BufferedReader;
//import java.io.IOException;
//import java.io.InputStreamReader;
//import java.text.DecimalFormat;
//import java.util.*;
//
//;
//
///**
// * Created by chenweida on 2017/6/2.
// */
//@Component
//@Scope("prototype")
//public class ElasticSearchHelperUtil {
//
//    private Logger logger = LoggerFactory.getLogger(ElasticSearchHelperUtil.class);
//    @Autowired
//    private ElasticFactory elasticFactory;
//
//    private JestClient jestClient;
//
//    @Value("${es.host}")
//    private String esHost;
//
//    @Autowired
//    private ObjectMapper objectMapper;
//
//    private static String[] curlCmds = new String[6];
//
//    @PostConstruct
//    public void init() {
//        jestClient = elasticFactory.getJestClient();
//        // curl命令格式
//        String url = esHost.split(",")[0].replace("http://", "").concat("/_sql");
//        curlCmds[0] = "curl";
//        curlCmds[1] = url;
//        curlCmds[2] = "-H";
//        curlCmds[3] = "Content-Type: application/json";
//        curlCmds[4] = "-d";
//    }
//
//    public <T> Boolean save(String index, String type, List<T> sources) throws IOException {
//        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
//        sources.forEach((item) -> {
//            Index indexObj = (new io.searchbox.core.Index.Builder(item)).build();
//            bulk.addAction(indexObj);
        });
        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
        this.logger.debug("update flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        Iterator var10 = list.iterator();
        while(var10.hasNext()) {
            Map map = (Map)var10.next();
            JSONObject jo = new JSONObject();
            jo.put("doc", map);
            Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)(new io.searchbox.core.Update.Builder(jo.toString())).index(index)).type(type)).id(String.valueOf(map.get("id")))).build();
            bulk.addAction(indexObj);
        }
        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
        this.logger.debug("update flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    public boolean update(String index, String type, String _id, JSONObject source) throws IOException {
        JSONObject docSource = new JSONObject();
        docSource.put("doc", source);
        Update update = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)(new io.searchbox.core.Update.Builder(docSource)).index(index)).type(type)).id(_id)).build();
        JestResult jestResult = this.jestClient.execute(update);
        this.logger.debug("update info: " + jestResult.isSucceeded());
        return true;
    }
    public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
        Iterator var5 = datas.iterator();
        while(var5.hasNext()) {
            Map map = (Map)var5.next();
            if (map.containsKey("id") && map.containsKey("_id")) {
                Delete indexObj = null;
                if (null != map.get("_id")) {
                    indexObj = (new io.searchbox.core.Delete.Builder(map.get("_id").toString())).build();
                } else if (null != map.get("id")) {
                    indexObj = (new io.searchbox.core.Delete.Builder(map.get("id").toString())).build();
                }
                bulk.addAction(indexObj);
            }
        }
        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
        this.logger.debug("delete data count: " + datas.size());
        this.logger.debug("delete flag: " + br.isSucceeded());
        return br.isSucceeded();
    }
    public SearchResult search(String index, String type, String queryStr) throws IOException {
        Search search = ((io.searchbox.core.Search.Builder)((io.searchbox.core.Search.Builder)(new io.searchbox.core.Search.Builder(queryStr)).addIndex(index)).addType(type)).build();
        SearchResult result = (SearchResult)this.jestClient.execute(search);
        this.logger.info("search data count: " + result.getTotal());
        return result;
    }
    public List<Map<String, Object>> executeSQL(String sql) throws Exception {
        List<Map<String, Object>> returnModels = new ArrayList();
        SQLExprParser parser = new ElasticSqlExprParser(sql);
        SQLExpr expr = parser.expr();
        SQLQueryExpr queryExpr = (SQLQueryExpr)expr;
//        SQLBinaryExpr queryExpr = (SQLBinaryExpr)expr;
        Select select = (new SqlParser()).parseSelect(queryExpr);
        AggregationQueryAction action = null;
        DefaultQueryAction queryAction = null;
        SqlElasticSearchRequestBuilder requestBuilder = null;
        if (select.isAgg) {
            action = new AggregationQueryAction(this.elasticFactory.getTransportClient(), select);
            requestBuilder = action.explain();
        } else {
            Client client = this.elasticFactory.getTransportClient();
            queryAction = new DefaultQueryAction(client, select);
            requestBuilder = queryAction.explain();
        }
        SearchResponse response = (SearchResponse)requestBuilder.get();
        Object queryResult = null;
        if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
            queryResult = response.getHits();
        } else {
            queryResult = response.getAggregations();
        }
        ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
        List<String> heads = temp.getHeaders();
        temp.getLines().stream().forEach((one) -> {
            Map<String, Object> oneMap = new HashMap();
            for(int i = 0; i < one.size(); ++i) {
                Object value = one.get(i);
                String key = (String)heads.get(i);
                oneMap.put(key, value);
            }
            returnModels.add(oneMap);
        });
        return returnModels;
    }
    public Integer executeCountSQL(String sql) throws Exception {
        SQLQueryExpr queryExpr = new SQLQueryExpr();
        Select select = (new SqlParser()).parseSelect(queryExpr);
        SqlElasticSearchRequestBuilder requestBuilder;
        if (select.isAgg) {
            AggregationQueryAction action = new AggregationQueryAction(this.elasticFactory.getTransportClient(), select);
            requestBuilder = action.explain();
        } else {
            Client client = this.elasticFactory.getTransportClient();
            DefaultQueryAction queryAction = new DefaultQueryAction(client, select);
            requestBuilder = queryAction.explain();
        }
        SearchResponse response = (SearchResponse)requestBuilder.get();
        Object queryResult;
        if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
            queryResult = response.getHits();
        } else {
            queryResult = response.getAggregations();
        }
        ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
        for(int j = 0; j < temp.getLines().size(); ++j) {
            List<Object> one = (List)temp.getLines().get(j);
            for(int i = 0; i < one.size(); ++i) {
                Object value = one.get(i);
                if (value instanceof Double) {
                    Double valueTemp = (Double)value;
                    DecimalFormat df = new DecimalFormat("######0");
                    return Integer.parseInt(df.format(valueTemp));
                }
            }
        }
        return 0;
    }
    /**
     * 使用curl方式执行 es-sql 查询操作
     * @param sql
     * @return
     */
    public List<JSONObject> execCurl(String sql){
        List<JSONObject> result = new ArrayList<>();
        curlCmds[5] = sql;
        ProcessBuilder process = new ProcessBuilder(curlCmds);
        Process p;
        StringBuilder builder = new StringBuilder();
        try {
            p = process.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
            String line = null;
            while ((line = reader.readLine()) != null) {
                builder.append(line);
                builder.append(System.getProperty("line.separator"));
            }
        } catch (IOException e) {
            System.out.print("error");
            e.printStackTrace();
        }
        try {
            logger.info("request curlCmds:" + objectMapper.writeValueAsString(curlCmds));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        String esResponse = builder.toString();
        logger.info("esResponse:" + esResponse);
        JSONObject esResObj = JSONObject.parseObject(esResponse);
        if(null != esResObj){
            if(null == (esResObj.get("error"))){
                if(esResObj.getJSONObject("_shards").getIntValue("total") > 0){
                    List resList = JSONObject.parseObject(esResponse).getJSONObject("hits").getJSONArray("hits");
                    if(!CollectionUtils.isEmpty(resList)){
                        resList.forEach(
                                one->{
                                    JSONObject oneHit = (JSONObject)one;
                                    result.add(oneHit.getJSONObject("_source"));
                                }
                        );
                    }
                }
            }
        }
        logger.info("execCurl result:"+result.toString());
        return result;
    }
}
//        });
//        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
//        this.logger.debug("save flag: " + br.isSucceeded());
//        return br.isSucceeded();
//    }
//
//    public Boolean save(String index, String type, String source,String errMsg) throws IOException {
//        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
//        Index indexObj = (new io.searchbox.core.Index.Builder(source)).build();
//        bulk.addAction(indexObj);
//        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
//        if(!br.isSucceeded()){
//            this.logger.error("save flag: " + br.isSucceeded());
//            errMsg = br.getErrorMessage();
//        }
//        return br.isSucceeded();
//    }
//
//    public Boolean saveWithCustomId(String index, String type, String source, String idFieldString,StringBuilder errMsg) throws IOException {
//        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
//        JSONObject jsonObject = (JSONObject)(JSONObject.parse(source));
//        Index indexObj = ((io.searchbox.core.Index.Builder)(new io.searchbox.core.Index.Builder(source)).id(jsonObject.getString(idFieldString))).build();
//        bulk.addAction(indexObj);
//        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
//        if(!br.isSucceeded()){
//            errMsg.append(br.getJsonString());
//            this.logger.error("save flag: " + br.isSucceeded() + "  " + errMsg.toString());
//        }
//        return br.isSucceeded();
//    }
//
//    public Boolean saveBulkWithCustomId(String index, String type, List<String> sources, String idFieldString) throws IOException {
//        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
//        sources.forEach((item) -> {
//            JSONObject jsonObject = (JSONObject)((JSONObject)JSONObject.parse(item));
//            Index indexObj = ((io.searchbox.core.Index.Builder)(new io.searchbox.core.Index.Builder(item)).id(jsonObject.getString(idFieldString))).build();
//            bulk.addAction(indexObj);
//        });
//        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
//        this.logger.debug("save flag: " + br.isSucceeded());
//        return br.isSucceeded();
//    }
//
//    public <T> Boolean update(String index, String type, List<T> sources) throws IOException {
//        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
//        sources.forEach((item) -> {
//            JSONObject jo = new JSONObject();
//            jo.put("doc", item);
////            Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((Builder)(new Builder(jo.toString())).index(index)).type(type)).id((item).getId())).build();
////            bulk.addAction(indexObj);
//        });
//        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
//        this.logger.debug("update flag: " + br.isSucceeded());
//        return br.isSucceeded();
//    }
//
//    public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
//        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
//        Iterator var10 = list.iterator();
//
//        while(var10.hasNext()) {
//            Map map = (Map)var10.next();
//            JSONObject jo = new JSONObject();
//            jo.put("doc", map);
//            Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)(new io.searchbox.core.Update.Builder(jo.toString())).index(index)).type(type)).id(String.valueOf(map.get("id")))).build();
//            bulk.addAction(indexObj);
//        }
//
//        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
//        this.logger.debug("update flag: " + br.isSucceeded());
//        return br.isSucceeded();
//    }
//
//    public boolean update(String index, String type, String _id, JSONObject source) throws IOException {
//        JSONObject docSource = new JSONObject();
//        docSource.put("doc", source);
//        Update update = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)(new io.searchbox.core.Update.Builder(docSource)).index(index)).type(type)).id(_id)).build();
//        JestResult jestResult = this.jestClient.execute(update);
//        this.logger.debug("update info: " + jestResult.isSucceeded());
//        return true;
//    }
//
//    public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
//        Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
//        Iterator var5 = datas.iterator();
//
//        while(var5.hasNext()) {
//            Map map = (Map)var5.next();
//            if (map.containsKey("id") && map.containsKey("_id")) {
//                Delete indexObj = null;
//                if (null != map.get("_id")) {
//                    indexObj = (new io.searchbox.core.Delete.Builder(map.get("_id").toString())).build();
//                } else if (null != map.get("id")) {
//                    indexObj = (new io.searchbox.core.Delete.Builder(map.get("id").toString())).build();
//                }
//
//                bulk.addAction(indexObj);
//            }
//        }
//
//        BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
//        this.logger.debug("delete data count: " + datas.size());
//        this.logger.debug("delete flag: " + br.isSucceeded());
//        return br.isSucceeded();
//    }
//
//    public SearchResult search(String index, String type, String queryStr) throws IOException {
//        Search search = ((io.searchbox.core.Search.Builder)((io.searchbox.core.Search.Builder)(new io.searchbox.core.Search.Builder(queryStr)).addIndex(index)).addType(type)).build();
//        SearchResult result = (SearchResult)this.jestClient.execute(search);
//        this.logger.info("search data count: " + result.getTotal());
//        return result;
//    }
//
//    public List<Map<String, Object>> executeSQL(String sql) throws Exception {
//        List<Map<String, Object>> returnModels = new ArrayList();
//        SQLExprParser parser = new ElasticSqlExprParser(sql);
//        SQLExpr expr = parser.expr();
//        SQLQueryExpr queryExpr = (SQLQueryExpr)expr;
////        SQLBinaryExpr queryExpr = (SQLBinaryExpr)expr;
//        Select select = (new SqlParser()).parseSelect(queryExpr);
//        AggregationQueryAction action = null;
//        DefaultQueryAction queryAction = null;
//        SqlElasticSearchRequestBuilder requestBuilder = null;
//        if (select.isAgg) {
//            action = new AggregationQueryAction(this.elasticFactory.getTransportClient(), select);
//            requestBuilder = action.explain();
//        } else {
//            Client client = this.elasticFactory.getTransportClient();
//            queryAction = new DefaultQueryAction(client, select);
//            requestBuilder = queryAction.explain();
//        }
//
//        SearchResponse response = (SearchResponse)requestBuilder.get();
//        Object queryResult = null;
//        if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
//            queryResult = response.getHits();
//        } else {
//            queryResult = response.getAggregations();
//        }
//
//        ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
//        List<String> heads = temp.getHeaders();
//        temp.getLines().stream().forEach((one) -> {
//            Map<String, Object> oneMap = new HashMap();
//
//            for(int i = 0; i < one.size(); ++i) {
//                Object value = one.get(i);
//                String key = (String)heads.get(i);
//                oneMap.put(key, value);
//            }
//
//            returnModels.add(oneMap);
//        });
//        return returnModels;
//    }
//
//    public Integer executeCountSQL(String sql) throws Exception {
//        SQLQueryExpr queryExpr = new SQLQueryExpr();
//        Select select = (new SqlParser()).parseSelect(queryExpr);
//        SqlElasticSearchRequestBuilder requestBuilder;
//        if (select.isAgg) {
//            AggregationQueryAction action = new AggregationQueryAction(this.elasticFactory.getTransportClient(), select);
//            requestBuilder = action.explain();
//        } else {
//            Client client = this.elasticFactory.getTransportClient();
//            DefaultQueryAction queryAction = new DefaultQueryAction(client, select);
//            requestBuilder = queryAction.explain();
//        }
//
//        SearchResponse response = (SearchResponse)requestBuilder.get();
//        Object queryResult;
//        if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
//            queryResult = response.getHits();
//        } else {
//            queryResult = response.getAggregations();
//        }
//
//        ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
//
//        for(int j = 0; j < temp.getLines().size(); ++j) {
//            List<Object> one = (List)temp.getLines().get(j);
//
//            for(int i = 0; i < one.size(); ++i) {
//                Object value = one.get(i);
//                if (value instanceof Double) {
//                    Double valueTemp = (Double)value;
//                    DecimalFormat df = new DecimalFormat("######0");
//                    return Integer.parseInt(df.format(valueTemp));
//                }
//            }
//        }
//
//        return 0;
//    }
//
//    /**
//     * 使用curl方式执行 es-sql 查询操作
//     * @param sql
//     * @return
//     */
//    public List<JSONObject> execCurl(String sql){
//        List<JSONObject> result = new ArrayList<>();
//        curlCmds[5] = sql;
//        ProcessBuilder process = new ProcessBuilder(curlCmds);
//        Process p;
//        StringBuilder builder = new StringBuilder();
//        try {
//            p = process.start();
//            BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
//            String line = null;
//            while ((line = reader.readLine()) != null) {
//                builder.append(line);
//                builder.append(System.getProperty("line.separator"));
//            }
//
//        } catch (IOException e) {
//            System.out.print("error");
//            e.printStackTrace();
//        }
//        try {
//            logger.info("request curlCmds:" + objectMapper.writeValueAsString(curlCmds));
//        } catch (JsonProcessingException e) {
//            e.printStackTrace();
//        }
//        String esResponse = builder.toString();
//        logger.info("esResponse:" + esResponse);
//        JSONObject esResObj = JSONObject.parseObject(esResponse);
//        if(null != esResObj){
//            if(null == (esResObj.get("error"))){
//                if(esResObj.getJSONObject("_shards").getIntValue("total") > 0){
//                    List resList = JSONObject.parseObject(esResponse).getJSONObject("hits").getJSONArray("hits");
//                    if(!CollectionUtils.isEmpty(resList)){
//                        resList.forEach(
//                                one->{
//                                    JSONObject oneHit = (JSONObject)one;
//                                    result.add(oneHit.getJSONObject("_source"));
//                                }
//                        );
//
//                    }
//                }
//            }
//        }
//        logger.info("execCurl result:"+result.toString());
//        return result;
//    }
//}

+ 5 - 64
business/es-service/src/main/java/com/yihu/jw/es/es/ElastricSearchSave.java

@ -1,18 +1,14 @@
package com.yihu.jw.es.es;
import com.alibaba.fastjson.JSONObject;
import com.yihu.jw.es.util.ElasticsearchUtil;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;
import io.searchbox.core.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
;
@ -26,31 +22,14 @@ public class ElastricSearchSave {
    private Logger logger = LoggerFactory.getLogger(ElastricSearchSave.class);
    @Autowired
    private ElasticFactory elasticFactory;
    private ElasticsearchUtil elasticsearchUtil;
    public Boolean save(List sms, String esIndex, String esType) {
        JestClient jestClient = null;
        try {
            //得到链接
            jestClient = elasticFactory.getJestClient();
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            for (Object obj : sms) {
                Index index = new Index.Builder(obj).build();
                bulk.addAction(index);
            }
            BulkResult br = jestClient.execute(bulk.build());
            logger.info("save data count:" + sms.size());
            logger.info("save flag:" + br.isSucceeded());
            return br.isSucceeded();
            return elasticsearchUtil.saveList(esIndex,sms);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(" save error :" + e.getMessage());
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
        return null;
    }
@ -59,15 +38,7 @@ public class ElastricSearchSave {
        JestClient jestClient = null;
        try {
            //得到链接
            jestClient = elasticFactory.getJestClient();
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            Index index = new Index.Builder(obj).build();
            bulk.addAction(index);
            BulkResult br = jestClient.execute(bulk.build());
            logger.info("save flag:" + br.isSucceeded());
            return br.isSucceeded();
            return elasticsearchUtil.save(esIndex,JSONObject.toJSONString(obj));
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(" save error :" + e.getMessage());
@ -126,41 +97,11 @@ public class ElastricSearchSave {
    }*/
    public Boolean batchSave(List sms, String esIndex, String esType) {
        JestClient jestClient = null;
        List bulkList = new ArrayList();
        try {
            //得到链接
            jestClient = elasticFactory.getJestClient();
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
//            BulkResult br = null;
            for (Object obj : sms) {
                bulkList.add(obj);
                Index index = new Index.Builder(obj).build();
                bulk.addAction(index);
                if(bulkList.size() >= 500){
                    BulkResult br = jestClient.execute(bulk.build());
                    logger.info("save data count:" + bulkList.size());
                    logger.info("save flag:" + br.isSucceeded());
                    bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
                    bulkList.clear();
                }
            }
            //如果不是1千的倍数,那最后还得再存储一次
            if(sms.size() % 1000 != 0){
                BulkResult br = jestClient.execute(bulk.build());
                logger.info("save data count:" + (sms.size() / 1000));
                logger.info("save flag:" + br.isSucceeded());
            }
            return true;
            return elasticsearchUtil.saveList(esIndex,sms);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(" save error :" + e.getMessage());
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
        return null;
    }

+ 413 - 72
business/es-service/src/main/java/com/yihu/jw/es/util/ElasticsearchUtil.java

@ -2,14 +2,38 @@ package com.yihu.jw.es.util;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.parser.ParserException;
import com.alibaba.druid.sql.parser.SQLExprParser;
import com.yihu.jw.es.es.ElasticFactory;
import com.alibaba.druid.sql.parser.Token;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.jw.util.date.DateUtil;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.InternalSum;
import org.elasticsearch.search.aggregations.metrics.InternalValueCount;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.nlpcn.es4sql.domain.Select;
import org.nlpcn.es4sql.jdbc.ObjectResult;
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
@ -26,6 +50,8 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.lang.reflect.Field;
import java.sql.Timestamp;
import java.text.ParseException;
@ -44,9 +70,11 @@ public class ElasticsearchUtil {
    private Logger logger = LoggerFactory.getLogger(ElasticsearchUtil.class);
    private final String commonParams = "xmijk_quota";
    @Autowired
    private ElasticFactory elasticFactory;
    ObjectMapper objectMapper;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
@ -56,6 +84,367 @@ public class ElasticsearchUtil {
    @Value("${es.index.Statistics}")
    private String esIndex;
    /**
     * @param boolQueryBuilder  查询参数 build
     * @param pageNo
     * @param pageSize
     * @param sortName 排序字段名称
     * @return
     */
    public List<Map<String, Object>> queryPageList(String index, BoolQueryBuilder boolQueryBuilder,
                                                   int pageNo, int pageSize, String sortName) throws IOException {
        SortBuilder dealSorter = SortBuilders.fieldSort(sortName).order(SortOrder.DESC);
        SearchRequest request = new SearchRequest(index);
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.query(boolQueryBuilder).sort(dealSorter).from(pageNo - 1).size(pageSize);
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        List<Map<String, Object>> matchRsult = new LinkedList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()){
            matchRsult.add(hit.getSourceAsMap());
        }
        return matchRsult;
    }
    /**
     * @param boolQueryBuilder  查询参数 build
     * @return
     */
    public long getTotalCount(String index,BoolQueryBuilder boolQueryBuilder) throws IOException {
        SearchRequest request = new SearchRequest(index);
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        searchBuilder.query(boolQueryBuilder);
        request.source(searchBuilder);
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        if(hits != null){
            return hits.getTotalHits().value;
        }
        return 0;
    }
    /**
     * @param boolQueryBuilder  查询参数 build
     * @param sortName 排序字段名称
     * @return
     */
    public List<Map<String, Object>> queryList(String index, String type, BoolQueryBuilder boolQueryBuilder, String sortName, int size) throws IOException {
        SearchResponse actionGet = null;
        SortBuilder dealSorter = null;
        if(sortName != null){
            dealSorter = SortBuilders.fieldSort(sortName).order(SortOrder.DESC);
        }else{
            dealSorter = SortBuilders.fieldSort("_id").order(SortOrder.DESC);
        }
        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.sort(dealSorter);
        searchRequest.source(builder);
        actionGet = restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT);
        SearchHits hits = actionGet.getHits();
        List<Map<String, Object>> matchRsult = new LinkedList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()){
            Map<String, Object> map = new HashMap<>() ;
            map = hit.getSourceAsMap();
            map.put("id",hit.getId());
            matchRsult.add(map);
        }
        return matchRsult;
    }
    /**
     * 执行搜索(带分组求和sum)
     * @param queryBuilder 查询内容
     * @param aggsField 要分组的字段
     * @param sumField 要求和的字段  只支持一个字段
     * @return
     */
    public List<Map<String, Object>> searcherByGroup( String index, BoolQueryBuilder queryBuilder, String aggsField , String sumField) throws IOException {
        List<Map<String, Object>> list = new ArrayList<>();
        SearchSourceBuilder builder =new  SearchSourceBuilder();
        builder.query(queryBuilder);
        SearchRequest request =new SearchRequest(index);
        //创建TermsBuilder对象,使用term查询,设置该分组的名称为 name_count,并根据aggsField字段进行分组
        TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(aggsField+"_val").field(aggsField);
        SumAggregationBuilder ageAgg = AggregationBuilders.sum(sumField+"_count").field(sumField);
        builder.aggregation(termsBuilder.subAggregation(ageAgg));
        request.source(builder);
        Map<String, Object> dataMap = new HashMap<String, Object>();
        //执行搜索
        SearchResponse searchResponse = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        //解析返回数据,获取分组名称为aggs-class的数据
        Terms terms = searchResponse.getAggregations().get(aggsField+"_val");
        Collection<Terms.Bucket> buckets = (Collection<Terms.Bucket>) terms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKey().toString();
            if (bucket.getAggregations().asList().get(0) instanceof InternalSum) {//(sum(xx))
                InternalSum count = (InternalSum) bucket.getAggregations().asList().get(0);
                dataMap.put(key, count.value());
            }
        }
        list.add(dataMap);
        return list;
    }
    /**
     * 根据mysql 语句进行分组求和查询
     * @param index 索引名称
     * @param aggsFields 分组字段 支持多个
     * @param filter 条件
     * @param sumField  求和字段
     * @param orderFild 排序字段
     * @param order 排序 asc,desc
     * @return
     */
    public Map<String, Integer> searcherSumByGroupBySql(String index, String aggsFields ,String filter , String sumField,String orderFild,String order) throws Exception {
        Map<String,Integer> map = new LinkedHashMap<>();
        Client client = (Client) restHighLevelClient.getLowLevelClient();
//       String mysql1 = "select org ,sum(result) from quota where quotaCode='depart_treat_count' group by org  ";id=16
        StringBuffer mysql = new StringBuffer("select ");
        mysql.append(aggsFields)
                .append(" ,sum(").append(sumField).append(") ")
                .append(" from ").append(index)
                .append(" where ").append(filter)
                .append(" group by ").append(aggsFields);
        if (org.apache.commons.lang.StringUtils.isNotEmpty(orderFild) && org.apache.commons.lang.StringUtils.isNotEmpty(order)){
            mysql.append(" order by ").append(orderFild).append(" ").append(order);
        }
        System.out.println("查询分组 mysql= " + mysql.toString());
        SQLExprParser parser = new ElasticSqlExprParser(mysql.toString());
        SQLExpr expr = parser.expr();
        if (parser.getLexer().token() != Token.EOF) {
            throw new ParserException("illegal sql expr : " + mysql);
        }
        SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
        //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
        Select select = null;
        select = new SqlParser().parseSelect(queryExpr);
        AggregationQueryAction action = null;
        DefaultQueryAction queryAction = null;
        SqlElasticSearchRequestBuilder requestBuilder = null;
        if (select.isAgg) {
            //包含计算的的排序分组的
            action = new AggregationQueryAction(client, select);
            requestBuilder = action.explain();
        } else {
            //封装成自己的Select对象
            queryAction = new DefaultQueryAction(client, select);
            requestBuilder = queryAction.explain();
        }
        //之后就是对ES的操作
        SearchResponse response = (SearchResponse) requestBuilder.get();
        StringTerms stringTerms = (StringTerms) response.getAggregations().asList().get(0);
        Iterator gradeBucketIt = stringTerms.getBuckets().iterator();
        //里面存放的数据 例  350200-5-2-2    主维度  细维度1  细维度2  值
        //递归解析json
        expainJson(gradeBucketIt, map, null);
        return map;
    }
    /**
     *
     * @param source 表字段组合json格式
     * @return
     */
    public boolean save(String index,String source) throws IOException {
        IndexRequest request = new IndexRequest(index);
        request.source(source);
        IndexResponse indexResponse = restHighLevelClient.index(request,RequestOptions.DEFAULT);
        boolean result =  indexResponse.isFragment();
        return result;
    }
    public <T> Boolean saveList(String index, List<T> sources) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (int i = 0; i < sources.size(); i++) {
            bulkRequest.add(new IndexRequest(index)
                            // 不指定ID的话,新增时ID是随机的
//                    .id(items.get(i).getId().toString())
                            .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
            );
            // bulkRequest.add(UpdateRequest)   批量更新
            // bulkRequest.add(DeleteRequest)   批量删除
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        logger.info(bulkResponse.buildFailureMessage());
        return !bulkResponse.hasFailures();
    }
    /**
     * 查询后 存在 删除
     * @param boolQueryBuilder
     */
    public synchronized  boolean queryDelete(String index,BoolQueryBuilder boolQueryBuilder) throws IOException {
        BulkRequest builder = new BulkRequest();
        DeleteRequestBuilder deleteRequestBuilder = null ;
        SearchRequest request = new SearchRequest(index);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.size(10000);
        sourceBuilder.query(boolQueryBuilder);
        request.source(sourceBuilder);
        SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        for (SearchHit hit : hits.getHits()){
            DeleteRequest deleteRequest = new DeleteRequest(index);
            deleteRequest.id(hit.getId());
            builder.add(deleteRequest);
        }
        //进行批量删除操作
        boolean optFlag = true;
        if(hits.getHits() != null && hits.getHits().length > 0){
            BulkResponse bulkResponse = restHighLevelClient.bulk(builder,RequestOptions.DEFAULT);
            if (bulkResponse.hasFailures()) {
                optFlag = false;
            }else {
                optFlag = true;
            }
        }
        return  optFlag;
    }
    /**
     * 递归解析json
     *
     * @param gradeBucketIt
     * @param map
     * @param sb
     */
    private void expainJson(Iterator<Terms.Bucket> gradeBucketIt,Map<String,Integer> map, StringBuffer sb) {
        while (gradeBucketIt.hasNext()) {
            Terms.Bucket b =  gradeBucketIt.next();
            if (b.getAggregations().asList().get(0) instanceof StringTerms) {
                StringTerms stringTermsCh = (StringTerms) b.getAggregations().asList().get(0);
                Iterator gradeBucketItCh = stringTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
                }
            }else if (b.getAggregations().asList().get(0) instanceof LongTerms) {
                LongTerms longTermsCh = (LongTerms) b.getAggregations().asList().get(0);
                Iterator gradeBucketItCh = longTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
                }
            }else if (b.getAggregations().asList().get(0) instanceof DoubleTerms) {
                DoubleTerms doubleTermsCh = (DoubleTerms) b.getAggregations().asList().get(0);
                Iterator gradeBucketItCh =doubleTermsCh.getBuckets().iterator();
                while (gradeBucketItCh.hasNext()) {
                    StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                    expainJson(gradeBucketItCh, map, sbTemp);
                }
            }else if (b.getAggregations().asList().get(0) instanceof InternalValueCount) {//count(8)
                InternalValueCount count = (InternalValueCount) b.getAggregations().asList().get(0);
                StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                map.put(sbTemp.toString() , (int)count.getValue());
            }else if (b.getAggregations().asList().get(0) instanceof InternalSum) {//(sum(xx))
                InternalSum count = (InternalSum) b.getAggregations().asList().get(0);
                StringBuffer sbTemp = new StringBuffer((sb == null ? "" : (sb.toString() + "-")) + b.getKey());
                map.put(sbTemp.toString() , (int)count.getValue());
            }
        }
    }
    /**
     * 执行sql查询es
     * @param sql
     * @return
     */
    public List<Map<String, Object>> excuteDataModel(String sql) {
        List<Map<String, Object>> returnModels = new ArrayList<>();
        Client client = (Client) restHighLevelClient.getLowLevelClient();
        try {
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
            SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
            Select select = null;
            select = new SqlParser().parseSelect(queryExpr);
            //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
            AggregationQueryAction action = null;
            DefaultQueryAction queryAction = null;
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(client, select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            Object queryResult = null;
            if (sql.toUpperCase().indexOf("GROUP") != -1 || sql.toUpperCase().indexOf("SUM") != -1 || sql.toUpperCase().indexOf("COUNT") != -1) {
                queryResult = response.getAggregations();
            } else {
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            temp.getLines().stream().forEach(one -> {
                try {
                    Map<String, Object> oneMap = new HashMap<String, Object>();
                    for (int i = 0; i < one.size(); i++) {
                        String key = null;
                        Object value = one.get(i);
                        key = heads.get(i);
                        oneMap.put(key, value);
                    }
                    returnModels.add(oneMap);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnModels;
    }
    public long getCountBySql(String sql) {
        try {
            Client client = (Client) restHighLevelClient.getLowLevelClient();
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
            SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
            Select select = null;
            select = new SqlParser().parseSelect(queryExpr);
            //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
            AggregationQueryAction action = null;
            DefaultQueryAction queryAction = null;
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(client, select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            SearchHits hits = response.getHits();
            if(hits != null){
                return hits.getTotalHits().value;
            }
            return 0;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }
    /**
     * 直接执行essql语句
@ -64,6 +453,7 @@ public class ElasticsearchUtil {
     * @throws Exception
     */
    public ObjectResult excuteSql(String sql) throws Exception{
        Client client = (Client) restHighLevelClient.getLowLevelClient();
        SQLExprParser parser = new ElasticSqlExprParser(sql);
        SQLExpr expr = parser.expr();
        SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
@ -77,11 +467,11 @@ public class ElasticsearchUtil {
        SqlElasticSearchRequestBuilder requestBuilder = null;
        if (select.isAgg) {
            //包含计算的的排序分组的
            action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
            action = new AggregationQueryAction(client, select);
            requestBuilder = action.explain();
        } else {
            //封装成自己的Select对象
            queryAction = new DefaultQueryAction(elasticFactory.getTransportClient(), select);
            queryAction = new DefaultQueryAction(client, select);
            requestBuilder = queryAction.explain();
        }
        SearchResponse response = (SearchResponse) requestBuilder.get();
@ -91,13 +481,14 @@ public class ElasticsearchUtil {
        } else {
            queryResult = response.getHits();
        }
        ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
        ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction).extractResults(queryResult, true);
        return temp;
    }
    public List excute(String sql, Class clazz, String esType, String esIndex) {
        List saveModels = new ArrayList<>();
        Client client = (Client) restHighLevelClient.getLowLevelClient();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXX");
        SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -120,11 +511,11 @@ public class ElasticsearchUtil {
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
                action = new AggregationQueryAction(client, select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                queryAction = new DefaultQueryAction(elasticFactory.getTransportClient(), select);
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
@ -134,7 +525,7 @@ public class ElasticsearchUtil {
            } else {
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            temp.getLines().forEach(one -> {
                Object saveModel = null;
@ -175,12 +566,12 @@ public class ElasticsearchUtil {
                                        Date date = new Date();
                                        date = ts;
                                        value =date;
                                    
                                    } catch (Exception e2) {
                                        value = String.valueOf(one.get(i));
                                    }
                                }
                                
                            }
//                            value = DateUtil.strToDate(String.valueOf(value).replace("T00:00:00+0800", " 00:00:00"), "yyyy-MM-dd HH:mm:ss");
                        }
@ -224,6 +615,7 @@ public class ElasticsearchUtil {
    public Long excuteForLong(String sql, String esType, String esIndex) {
        try {
            Client client = (Client) restHighLevelClient.getLowLevelClient();
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
            SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
@ -237,15 +629,16 @@ public class ElasticsearchUtil {
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
                action = new AggregationQueryAction(client, select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                queryAction = new DefaultQueryAction(elasticFactory.getTransportClient(), select);
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(response.getAggregations(), true);
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction).extractResults(response.getAggregations(), true);
            Long Longvalue = ((Double) temp.getLines().get(0).get(0)).longValue();
            return Longvalue;
        } catch (Exception e) {
@ -266,6 +659,7 @@ public class ElasticsearchUtil {
    public Object excuteOneObject(String sql, Class clazz, String esType, String esIndex) {
        try {
            Client client = (Client) restHighLevelClient.getLowLevelClient();
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
            SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
@ -279,15 +673,15 @@ public class ElasticsearchUtil {
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
                action = new AggregationQueryAction(client, select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                queryAction = new DefaultQueryAction(elasticFactory.getTransportClient(), select);
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(response.getHits(), true);
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction).extractResults(response.getHits(), true);
            List<String> heads = temp.getHeaders();
            Object saveModel = clazz.newInstance();
            try {
@ -2631,60 +3025,7 @@ public class ElasticsearchUtil {
        return resultLevel;
    }
    public List<Map<String, Object>> excuteDataModel(String sql) {
        List<Map<String, Object>> returnModels = new ArrayList<>();
        try {
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLExpr expr = parser.expr();
            SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
//            if (parser.getLexer().token() != Token.EOF) {
//                throw new ParserException("illegal sql expr : " + sql);
//            }
            Select select = null;
            select = new SqlParser().parseSelect(queryExpr);
            //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
            AggregationQueryAction action = null;
            DefaultQueryAction queryAction = null;
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                Client client = elasticFactory.getTransportClient();
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            Object queryResult = null;
            if (sql.toUpperCase().indexOf("GROUP") != -1 || sql.toUpperCase().indexOf("SUM") != -1) {
                queryResult = response.getAggregations();
            } else {
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            temp.getLines().stream().forEach(one -> {
                try {
                    Map<String, Object> oneMap = new HashMap<String, Object>();
                    for (int i = 0; i < one.size(); i++) {
                        String key = null;
                        Object value = one.get(i);
                        key = heads.get(i);
                        oneMap.put(key, value);
                    }
                    returnModels.add(oneMap);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnModels;
    }
    /**
     * 将slaveKey1作为筛选条件而不进行group by