wangzhinan hai 1 ano
pai
achega
06cf7520f0
Modificáronse 17 ficheiros con 475 adicións e 310 borrados
  1. 5 5
      starter/elasticsearch-starter/pom.xml
  2. 4 2
      starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Helper.java
  3. 1 1
      starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Pool.java
  4. 1 1
      starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Util.java
  5. 4 5
      starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/TestEs7.java
  6. 13 8
      svr/svr-statistics-hlw/pom.xml
  7. 35 18
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/controller/QueryController.java
  8. 29 29
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/etl/save/SaveHelper.java
  9. 25 11
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/etl/save/es/ElasticFactory.java
  10. 112 112
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/etl/save/es/ElastricSearchSave.java
  11. 19 14
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/job/business/CurrentMysqlToEsQuotaJob.java
  12. 34 22
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/job/business/EsToEsQuotaJob.java
  13. 29 23
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/job/business/MysqlToEsQuotaJob.java
  14. 49 41
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/service/ExtractDataService.java
  15. 109 12
      svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/util/ElasticsearchUtil.java
  16. 5 5
      svr/svr-statistics-hlw/src/main/resources/application.yml
  17. 1 1
      wlyy-parent-pom/pom.xml

+ 5 - 5
starter/elasticsearch-starter/pom.xml

@ -44,11 +44,6 @@
            <artifactId>elasticsearch-java</artifactId>
            <version>7.17.4</version>
        </dependency>-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${version.druid}</version>
        </dependency>
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
@ -59,6 +54,11 @@
            <artifactId>elasticsearch-sql</artifactId>
            <version>7.8.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${version.druid}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>

+ 4 - 2
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Helper.java

@ -250,8 +250,10 @@ public class ElasticSearch7Helper {
        //遍历查询结果
        for(SearchHit hit : searchHits){
            String datas = hit.getSourceAsString();
            listData.add(datas);
            logger.info(datas);
            String id = hit.getId();
            JSONObject object =  JSONObject.parseObject(datas);
            object.put("id",id);
            listData.add(object.toJSONString());
        }
        return listData;
    }

+ 1 - 1
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Pool.java

@ -100,7 +100,7 @@ public class ElasticSearch7Pool {
    }
    public ResultSet restHighLevelClientStream(String sql) throws Exception {
        Connection connection = DriverManager.getConnection("jdbc:es://http://172.26.0.56:9200","elastic","elastic");
        Connection connection = DriverManager.getConnection("jdbc:es://http://172.26.0.55:9200","elastic","elastic");
        Statement statement = connection.createStatement();
        ResultSet resultSet =  statement.executeQuery(sql);
//        connection.close();

+ 1 - 1
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Util.java

@ -159,7 +159,7 @@ public class ElasticSearch7Util {
        //根据索引、查询条件构建查询构造器
        BoolQueryBuilder boolQueryBuilder = createQueryBuilderBySql(sql);
        //将查询构造器注入SearchSourceBuilder
        searchBuilder.query(boolQueryBuilder);
        searchBuilder.query(boolQueryBuilder).sort("quotaDate",SortOrder.DESC).sort("createTime",SortOrder.DESC);
        //设置请求查询的索引(查询构造器中已指定,无需重复设置)
        //request.indices(indexName);
        //将构建好的SearchSourceBuilder注入请求

+ 4 - 5
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/TestEs7.java

@ -19,7 +19,6 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.nlpcn.es4sql.domain.Where;
import org.nlpcn.es4sql.exception.SqlParseException;
@ -35,7 +34,7 @@ public class TestEs7 {
    public static void main(String[] args) throws Exception{
        String host = "127.0.0.1";
        String host = "172.26.0.55";
        String userName = "elastic";
        String password = "elastic";
@ -69,10 +68,10 @@ public class TestEs7 {
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
        String index = "body_health_data";
        String index = "hlw_quota_test";
        /*GetIndexRequest request = new GetIndexRequest(index);
        boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);*/
        GetRequest getRequest = new GetRequest("body_health_data","1");
        GetRequest getRequest = new GetRequest("hlw_quota_test","1");
        GetResponse documentFields = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        System.out.println(documentFields);
@ -106,7 +105,7 @@ public class TestEs7 {
        SQLExprParser parser = new ElasticSqlExprParser(sql);
        SQLExpr expr = parser.expr();
        if (parser.getLexer().token() != Token.EOF) {
        if (parser.getLexer().token() == Token.EOF) {
            throw new ParserException("illegal sql expr : " + sql);
        }
        return expr;

+ 13 - 8
svr/svr-statistics-hlw/pom.xml

@ -81,20 +81,25 @@
        <!--elasticsearch start-->
        <dependency>
            <groupId>com.yihu.jw</groupId>
            <artifactId>elasticsearch-starter</artifactId>
            <version>${version.wlyy-common}</version>
        </dependency>
       <!-- <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${version.elasticsearch}</version>
        </dependency>
        <dependency>
        </dependency>-->
      <!--  <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>${version.jest}</version>
        </dependency>
        <dependency>
        </dependency>-->
        <!--<dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>shield</artifactId>
            <version>${version.elasticsearch}</version>
        </dependency>
        </dependency>-->
        <!--注释掉就不会读取git的配置,只会读取yml中的配置-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
@ -206,13 +211,13 @@
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.15</version>
            <version>${version.druid}</version>
        </dependency>
        <dependency>
      <!--  <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
            <version>2.4.2.1</version>
        </dependency>
        </dependency>-->
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>delete-by-query</artifactId>

+ 35 - 18
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/controller/QueryController.java

@ -1,7 +1,9 @@
package com.yihu.jw.statistics.controller;
import com.alibaba.fastjson.JSON;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.elasticsearch.ElasticSearch7Util;
import com.yihu.jw.statistics.etl.cache.Cache;
import com.yihu.jw.statistics.etl.save.es.ElasticFactory;
import com.yihu.jw.statistics.util.DateUtil;
import com.yihu.jw.statistics.util.ElasticsearchUtil;
import com.yihu.jw.statistics.vo.SaveModel;
@ -10,6 +12,12 @@ import io.searchbox.core.*;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -21,9 +29,11 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.logging.Logger;
/**
 * Created by chenweida on 2017/7/1.
@ -35,8 +45,14 @@ public class QueryController {
    @Autowired
    private ElasticsearchUtil elasticsearchUtil;
    /*@Autowired
    private ElasticFactory elasticFactory;*/
    @Autowired
    private ElasticFactory elasticFactory;
    private ElasticSearch7Util elasticSearch7Util;
    @Autowired
    private ElasticSearch7Helper elasticSearch7Helper;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    @Value("${es.type}")
    private String esType;
    @Value("${es.index}")
@ -44,9 +60,9 @@ public class QueryController {
    @ApiOperation(value = "执行sql")
    @RequestMapping(value = "/excuteSQL", method = RequestMethod.GET)
    public List<SaveModel> excuteSQL(
            @ApiParam(name = "sql", value = "执行的", required = true) @RequestParam(value = "sql", required = true) String sql) {
        List<SaveModel> saveModels = elasticsearchUtil.excute(sql);
    public List<Map<String,Object>> excuteSQL(
            @ApiParam(name = "sql", value = "执行的", required = true) @RequestParam(value = "sql", required = true) String sql) throws Exception {
        List<Map<String,Object>> saveModels = elasticSearch7Util.executeSQL(sql);
        return saveModels;
    }
@ -233,9 +249,7 @@ public class QueryController {
    private net.sf.json.JSONObject deleteData(Date quotaDate, String quotaCode) {
        net.sf.json.JSONObject jsonObject = new net.sf.json.JSONObject();
        JestClient jestClient = null;
        try {
            jestClient = elasticFactory.getJestClient();
            //先根据条件查找出来
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -252,27 +266,30 @@ public class QueryController {
            Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(esIndex).addType(esType)
                    .build();
            SearchResult result = jestClient.execute(search);
            List<SaveModel> saveModels = result.getSourceAsObjectList(SaveModel.class);
            List<String> result = elasticSearch7Helper.search(esIndex,searchSourceBuilder);
            List<SaveModel> saveModels = new ArrayList<>();
            for (String s:result){
                SaveModel saveModel = new SaveModel();
                saveModel = JSON.parseObject(s,SaveModel.class);
                saveModels.add(saveModel);
            }
            //根据id批量删除
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout(TimeValue.timeValueSeconds(10));
            for (SaveModel obj : saveModels) {
                Delete index = new Delete.Builder(obj.getId()).build();
                bulk.addAction(index);
                bulkRequest.add(new DeleteRequest(esIndex)
                        .id(obj.getId()));
            }
            BulkResult br = jestClient.execute(bulk.build());
            jsonObject.put("flag", br.isSucceeded());
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            Logger.getLogger(bulkResponse.buildFailureMessage());
            jsonObject.put("flag", !bulkResponse.hasFailures());
            jsonObject.put("count", saveModels.size());
            return jsonObject;
        } catch (Exception e) {
            jsonObject.put("flag", false);
            jsonObject.put("message", e.getMessage());
            return jsonObject;
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
    }
}

+ 29 - 29
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/etl/save/SaveHelper.java

@ -1,29 +1,29 @@
package com.yihu.jw.statistics.etl.save;
import com.yihu.jw.statistics.etl.save.es.ElastricSearchSave;
import com.yihu.jw.statistics.util.SpringUtil;
import com.yihu.jw.statistics.vo.SaveModel;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.List;
/**
 * Created by chenweida on 2017/6/2.
 */
@Component
@Scope("prototype")
public class SaveHelper {
    public Boolean save(List<SaveModel> sms) {
        return SpringUtil.getBean(ElastricSearchSave.class).save(sms);
    }
    public Boolean update(List<SaveModel> sms) {
        return SpringUtil.getBean(ElastricSearchSave.class).update(sms);
    }
}
//package com.yihu.jw.statistics.etl.save;
//
//import com.yihu.jw.statistics.etl.save.es.ElastricSearchSave;
//import com.yihu.jw.statistics.util.SpringUtil;
//import com.yihu.jw.statistics.vo.SaveModel;
//import org.springframework.context.annotation.Scope;
//import org.springframework.stereotype.Component;
//
//import java.util.List;
//
///**
// * Created by chenweida on 2017/6/2.
// */
//@Component
//@Scope("prototype")
//public class SaveHelper {
//
//    public Boolean save(List<SaveModel> sms) {
//        return SpringUtil.getBean(ElastricSearchSave.class).save(sms);
//
//
//    }
//
//    public Boolean update(List<SaveModel> sms) {
//        return SpringUtil.getBean(ElastricSearchSave.class).update(sms);
//
//
//    }
//}

+ 25 - 11
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/etl/save/es/ElasticFactory.java

@ -1,3 +1,4 @@
/*
package com.yihu.jw.statistics.etl.save.es;
import io.searchbox.client.JestClient;
@ -17,9 +18,11 @@ import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
*/
/**
 * Created by chenweida on 2017/6/5.
 */
 *//*
@Component
public class ElasticFactory {
    private static JestClientFactory factory = null;
@ -40,10 +43,12 @@ public class ElasticFactory {
    private String password;
//-----------------------------------jestClient----------------------------------------
    /**
    */
/**
     * @param "http://localhost:9200"
     * @return
     */
     *//*
    public JestClient getJestClient() {
        if (factory == null) {
            //初始化链接
@ -52,10 +57,12 @@ public class ElasticFactory {
        return factory.getObject();
    }
    /**
    */
/**
     * 初始化链接
     * 9200
     */
     *//*
    public synchronized void init() {
        String[] hostArray = esHost.split(",");
        // Construct a new Jest client according to configuration via factory
@ -93,11 +100,13 @@ public class ElasticFactory {
        return null;
    }
    /**
    */
/**
     * 9300
     * 互联网医院版本需要密码
     * @throws UnknownHostException
     */
     *//*
    private synchronized void initTranClientPw() throws Exception {
        if (transportClient == null) {
            String[] hosts = tHost.split(",");
@ -111,11 +120,13 @@ public class ElasticFactory {
    }
    /**
    */
/**
     * 9300
     *
     * @throws UnknownHostException
     */
     *//*
    private synchronized void initTranClient() throws UnknownHostException {
        if (transportClient == null) {
            String[] hosts = tHost.split(",");
@ -133,11 +144,13 @@ public class ElasticFactory {
        }
    }
    /**
    */
/**
     * 配置连接
     * @return
     * @throws Exception
     */
     *//*
    private Settings getSettings() throws Exception {
        Settings.Builder settingBuilder = Settings.settingsBuilder();
        settingBuilder.put("cluster.name", clusterName);
@ -147,3 +160,4 @@ public class ElasticFactory {
        return settingBuilder.build();
    }
}
*/

+ 112 - 112
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/etl/save/es/ElastricSearchSave.java

@ -1,112 +1,112 @@
package com.yihu.jw.statistics.etl.save.es;
import com.alibaba.fastjson.JSONObject;
import com.yihu.jw.statistics.vo.SaveModel;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Index;
import io.searchbox.core.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
;
/**
 * Created by chenweida on 2017/6/2.
 */
@Component
@Scope("prototype")
public class ElastricSearchSave {
    @Value("${es.type}")
    private String esType;
    @Value("${es.index}")
    private String esIndex;
    private Logger logger = LoggerFactory.getLogger(ElastricSearchSave.class);
    @Autowired
    private ElasticFactory elasticFactory;
    public Boolean save(List<SaveModel> sms) {
        JestClient jestClient = null;
        try {
            //得到链接
            jestClient = elasticFactory.getJestClient();
            int success = 0;
            int error = 0;
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            for (SaveModel obj : sms) {
                try {
                    obj.setCreateTime(new Date());
                    Index index = new Index.Builder(obj).build();
                    success++;
                    bulk.addAction(index);
                } catch (Exception e) {
                    logger.error(e.getMessage());
                    error++;
                }
            }
            BulkResult br = jestClient.execute(bulk.build());
            logger.info("save flag:" + br.isSucceeded());
            logger.info("save success:" + success);
            logger.info("save error:" + error);
            return br.isSucceeded();
        } catch (Exception e) {
            logger.error(" save error :" + e.getMessage());
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
        return null;
    }
    public Boolean update(List<SaveModel> sms) {
        JestClient jestClient = null;
        try {
            //得到链接
            jestClient = elasticFactory.getJestClient();
            int success = 0;
            int error = 0;
            boolean isSuccessed = true;
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            for (SaveModel obj : sms) {
                try {
                    JSONObject jo = new JSONObject();
                    jo.put("doc", obj);
                    Update index = new Update.Builder(jo.toString()).index(esIndex).type(esType).id(obj.getId()).build();
                    bulk.addAction(index);
                    success++;
                } catch (Exception e) {
                    error++;
                    isSuccessed = false;
                }
            }
            BulkResult br = jestClient.execute(bulk.build());
            logger.info("update flag:" + br.isSucceeded());
            logger.info("update success:" + success);
            logger.info("update error:" + error);
            jestClient.shutdownClient();
            return isSuccessed;
        } catch (Exception e) {
            logger.error(" update error :" + e.getMessage());
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
        return null;
    }
}
//package com.yihu.jw.statistics.etl.save.es;
//
//import com.alibaba.fastjson.JSONObject;
//import com.yihu.jw.statistics.vo.SaveModel;
//import io.searchbox.client.JestClient;
//import io.searchbox.core.Bulk;
//import io.searchbox.core.BulkResult;
//import io.searchbox.core.Index;
//import io.searchbox.core.Update;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Scope;
//import org.springframework.stereotype.Component;
//
//import java.util.Date;
//import java.util.List;
//
//;
//
///**
// * Created by chenweida on 2017/6/2.
// */
//@Component
//@Scope("prototype")
//public class ElastricSearchSave {
//    @Value("${es.type}")
//    private String esType;
//    @Value("${es.index}")
//    private String esIndex;
//
//    private Logger logger = LoggerFactory.getLogger(ElastricSearchSave.class);
//    @Autowired
//    private ElasticFactory elasticFactory;
//
//    public Boolean save(List<SaveModel> sms) {
//        JestClient jestClient = null;
//        try {
//            //得到链接
//            jestClient = elasticFactory.getJestClient();
//
//            int success = 0;
//            int error = 0;
//            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
//            for (SaveModel obj : sms) {
//                try {
//                    obj.setCreateTime(new Date());
//                    Index index = new Index.Builder(obj).build();
//                    success++;
//                    bulk.addAction(index);
//                } catch (Exception e) {
//                    logger.error(e.getMessage());
//                    error++;
//                }
//            }
//            BulkResult br = jestClient.execute(bulk.build());
//
//            logger.info("save flag:" + br.isSucceeded());
//            logger.info("save success:" + success);
//            logger.info("save error:" + error);
//            return br.isSucceeded();
//        } catch (Exception e) {
//            logger.error(" save error :" + e.getMessage());
//        } finally {
//            if (jestClient != null) {
//                jestClient.shutdownClient();
//            }
//        }
//        return null;
//    }
//
//
//    public Boolean update(List<SaveModel> sms) {
//        JestClient jestClient = null;
//        try {
//            //得到链接
//            jestClient = elasticFactory.getJestClient();
//
//            int success = 0;
//            int error = 0;
//            boolean isSuccessed = true;
//            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
//            for (SaveModel obj : sms) {
//                try {
//                    JSONObject jo = new JSONObject();
//                    jo.put("doc", obj);
//                    Update index = new Update.Builder(jo.toString()).index(esIndex).type(esType).id(obj.getId()).build();
//                    bulk.addAction(index);
//                    success++;
//                } catch (Exception e) {
//                    error++;
//                    isSuccessed = false;
//                }
//            }
//
//            BulkResult br = jestClient.execute(bulk.build());
//            logger.info("update flag:" + br.isSucceeded());
//            logger.info("update success:" + success);
//            logger.info("update error:" + error);
//            jestClient.shutdownClient();
//            return isSuccessed;
//        } catch (Exception e) {
//            logger.error(" update error :" + e.getMessage());
//        } finally {
//            if (jestClient != null) {
//                jestClient.shutdownClient();
//            }
//        }
//        return null;
//    }
//}

+ 19 - 14
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/job/business/CurrentMysqlToEsQuotaJob.java

@ -1,6 +1,8 @@
package com.yihu.jw.statistics.job.business;
import com.alibaba.fastjson.JSON;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.entity.quota.job.QuartzJobConfig;
import com.yihu.jw.entity.quota.job.QuartzJobLog;
import com.yihu.jw.statistics.dao.QuartzJobConfigDao;
@ -12,8 +14,6 @@ import com.yihu.jw.statistics.etl.convert.ConvertHelper;
import com.yihu.jw.statistics.etl.extract.ExtractHelper;
import com.yihu.jw.statistics.etl.extract.db.Data2Save;
import com.yihu.jw.statistics.etl.filter.FilterHelper;
import com.yihu.jw.statistics.etl.save.SaveHelper;
import com.yihu.jw.statistics.etl.save.es.ElasticFactory;
import com.yihu.jw.statistics.util.DateUtil;
import com.yihu.jw.statistics.util.SpringUtil;
import com.yihu.jw.statistics.vo.DataModel;
@ -24,6 +24,7 @@ import io.searchbox.client.JestClient;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import net.sf.json.JSONArray;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -41,6 +42,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
@ -67,8 +69,6 @@ public class CurrentMysqlToEsQuotaJob implements Job {
    @Autowired
    private WlyyDimensionQuotaDao dimensionQuotaDao;
    @Autowired
    private ElasticFactory elasticFactory;
    @Value("${es.type}")
    private String esType;
    @Value("${es.index}")
@ -81,6 +81,12 @@ public class CurrentMysqlToEsQuotaJob implements Job {
    private Data2Save data2Save;
    private String incrementInterval;//增量的时间间隔(天)
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    @Autowired
    private ElasticSearch7Helper elasticSearch7Helper;
    public void execute(JobExecutionContext context)
            throws JobExecutionException {
@ -253,9 +259,7 @@ public class CurrentMysqlToEsQuotaJob implements Job {
     * @param timeLevel
     */
    private boolean updateData(List<SaveModel> sms, Date quotaDate, String quotaCode, String timeLevel) {
        JestClient jestClient = null;
        try {
            jestClient = elasticFactory.getJestClient();
            //先根据条件查找出来
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(
@ -267,9 +271,13 @@ public class CurrentMysqlToEsQuotaJob implements Job {
            Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(esIndex).addType(esType)
                    .build();
            SearchResult result = jestClient.execute(search);
            List<SaveModel> quarySaveModels = result.getSourceAsObjectList(SaveModel.class);
            List<String> result = elasticSearch7Helper.search(esIndex,searchSourceBuilder);
            List<SaveModel> quarySaveModels = new ArrayList<>();
            for (String s:result){
                SaveModel saveModel = new SaveModel();
                saveModel = JSON.parseObject(s,SaveModel.class);
                quarySaveModels.add(saveModel);
            }
            //如果之前有值就用查询出来的然后修改数目即可
            if (quarySaveModels != null && quarySaveModels.size() > 0) {
                List<SaveModel> saveModels = new ArrayList<>();
@ -298,16 +306,13 @@ public class CurrentMysqlToEsQuotaJob implements Job {
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
        return false;
    }
    private boolean updateDate(List<SaveModel> sms) {
        try {
            return SpringUtil.getBean(SaveHelper.class).update(sms);
            return SpringUtil.getBean(ElasticSearch7Helper.class).update(esIndex,sms);
        } catch (Exception e) {
            logger.error("save error:" + e.getMessage());
        }
@ -335,7 +340,7 @@ public class CurrentMysqlToEsQuotaJob implements Job {
     */
    private Boolean saveDate(List<SaveModel> sms) {
        try {
            return SpringUtil.getBean(SaveHelper.class).save(sms);
            return SpringUtil.getBean(ElasticSearch7Helper.class).save(esIndex,sms);
        } catch (Exception e) {
            logger.error("save error:" + e.getMessage());
        }

+ 34 - 22
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/job/business/EsToEsQuotaJob.java

@ -1,5 +1,8 @@
package com.yihu.jw.statistics.job.business;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.entity.quota.job.QuartzJobConfig;
import com.yihu.jw.entity.quota.job.QuartzJobLog;
import com.yihu.jw.statistics.dao.QuartzJobConfigDao;
@ -9,13 +12,17 @@ import com.yihu.jw.statistics.etl.convert.ConvertHelper;
import com.yihu.jw.statistics.etl.extract.ExtractHelper;
import com.yihu.jw.statistics.etl.extract.db.Data2Save;
import com.yihu.jw.statistics.etl.filter.FilterHelper;
import com.yihu.jw.statistics.etl.save.SaveHelper;
import com.yihu.jw.statistics.etl.save.es.ElasticFactory;
import com.yihu.jw.statistics.util.DateUtil;
import com.yihu.jw.statistics.util.SpringUtil;
import com.yihu.jw.statistics.vo.*;
import io.searchbox.client.JestClient;
import io.searchbox.core.*;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -32,6 +39,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
@ -58,8 +66,15 @@ public class EsToEsQuotaJob implements Job {
    @Autowired
    private QuartzJobConfigDao quartzJobConfigDao;
    /*@Autowired
    private ElasticFactory elasticFactory;*/
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    @Autowired
    private ElasticFactory elasticFactory;
    private ElasticSearch7Helper elasticSearch7Helper;
    @Value("${es.type}")
    private String esType;
    @Value("${es.index}")
@ -166,9 +181,7 @@ public class EsToEsQuotaJob implements Job {
     * @param timeLevel
     */
    private void deleteData(Date quotaDate, String quotaCode, String timeLevel) {
        JestClient jestClient = null;
        try {
            jestClient = elasticFactory.getJestClient();
            //先根据条件查找出来
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(
@ -178,32 +191,31 @@ public class EsToEsQuotaJob implements Job {
                            .must(QueryBuilders.matchQuery("quotaDate", quotaDate))
            ).size(500000);//一次取10000条
            Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(esIndex).addType(esType)
                    .build();
            SearchResult result = jestClient.execute(search);
            List<SaveModel> saveModels = result.getSourceAsObjectList(SaveModel.class);
            List<String> result = elasticSearch7Helper.search(esIndex,searchSourceBuilder);
            List<SaveModel> saveModels = new ArrayList<>();
            for (String s:result){
                SaveModel saveModel = new SaveModel();
                saveModel = JSON.parseObject(s,SaveModel.class);
                saveModels.add(saveModel);
            }
            //根据id批量删除
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout(TimeValue.timeValueSeconds(10));
            for (SaveModel obj : saveModels) {
                Delete index = new Delete.Builder(obj.getId()).build();
                bulk.addAction(index);
                bulkRequest.add(new DeleteRequest(esIndex)
                        .id(obj.getId()));
            }
            BulkResult br = jestClient.execute(bulk.build());
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            logger.info(bulkResponse.buildFailureMessage());
            logger.info("delete data count:" + saveModels.size());
            logger.info("delete flag:" + br.isSucceeded());
            logger.info("delete flag:" +!bulkResponse.hasFailures());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
    }
    @Transactional
    private void saveLog(QuartzJobLog tjQuotaLog) {
    public void saveLog(QuartzJobLog tjQuotaLog) {
        quartzJobLogDao.save(tjQuotaLog);
    }
@ -223,7 +235,7 @@ public class EsToEsQuotaJob implements Job {
     */
    private Boolean saveDate(List<SaveModel> sms) {
        try {
            return SpringUtil.getBean(SaveHelper.class).save(sms);
            return SpringUtil.getBean(ElasticSearch7Helper.class).save(esIndex,sms);
        } catch (Exception e) {
            logger.error("save error:" + e.getMessage());
        }

+ 29 - 23
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/job/business/MysqlToEsQuotaJob.java

@ -1,6 +1,8 @@
package com.yihu.jw.statistics.job.business;
import com.alibaba.fastjson.JSON;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.entity.quota.job.QuartzJobConfig;
import com.yihu.jw.entity.quota.job.QuartzJobLog;
import com.yihu.jw.statistics.dao.QuartzJobConfigDao;
@ -11,8 +13,6 @@ import com.yihu.jw.statistics.etl.compute.ComputeHelper;
import com.yihu.jw.statistics.etl.convert.ConvertHelper;
import com.yihu.jw.statistics.etl.extract.ExtractHelper;
import com.yihu.jw.statistics.etl.filter.FilterHelper;
import com.yihu.jw.statistics.etl.save.SaveHelper;
import com.yihu.jw.statistics.etl.save.es.ElasticFactory;
import com.yihu.jw.statistics.util.DateUtil;
import com.yihu.jw.statistics.util.SpringUtil;
import com.yihu.jw.statistics.vo.BaseDimensionQuota;
@ -22,6 +22,12 @@ import com.yihu.jw.statistics.vo.SaveModel;
import io.searchbox.client.JestClient;
import io.searchbox.core.*;
import net.sf.json.JSONArray;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -38,7 +44,9 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -67,9 +75,10 @@ public class MysqlToEsQuotaJob implements Job {
    private WlyyDimensionQuotaDao dimensionQuotaDao;
    @Autowired
    private QuartzJobConfigDao quartzJobConfigDao;
    @Autowired
    private ElasticFactory elasticFactory;
    private ElasticSearch7Helper elasticSearch7Helper;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    @Value("${es.type}")
    private String esType;
    @Value("${es.index}")
@ -187,9 +196,7 @@ public class MysqlToEsQuotaJob implements Job {
     * @param timeLevel
     */
    private void deleteData(Date quotaDate, String quotaCode, String timeLevel) {
        JestClient jestClient = null;
        try {
            jestClient = elasticFactory.getJestClient();
            //先根据条件查找出来
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(
@ -197,29 +204,28 @@ public class MysqlToEsQuotaJob implements Job {
                            .must(QueryBuilders.matchQuery("quotaCode", quotaCode))
                            .must(QueryBuilders.matchQuery("timeLevel", timeLevel))
                            .must(QueryBuilders.matchQuery("quotaDate", quotaDate))
            ).size(500000);//一次取10000条
            Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(esIndex).addType(esType)
                    .build();
            SearchResult result = jestClient.execute(search);
            List<SaveModel> saveModels = result.getSourceAsObjectList(SaveModel.class);
            ).size(10000);//一次取10000条
            List<String> result = elasticSearch7Helper.search(esIndex,searchSourceBuilder);
            List<SaveModel> saveModels = new ArrayList<>();
            for (String s:result){
                SaveModel saveModel = new SaveModel();
                saveModel = JSON.parseObject(s,SaveModel.class);
                saveModels.add(saveModel);
            }
            //根据id批量删除
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout(TimeValue.timeValueSeconds(10));
            for (SaveModel obj : saveModels) {
                Delete index = new Delete.Builder(obj.getId()).build();
                bulk.addAction(index);
                bulkRequest.add(new DeleteRequest(esIndex)
                        .id(obj.getId()));
            }
            BulkResult br = jestClient.execute(bulk.build());
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            logger.info(bulkResponse.buildFailureMessage());
            logger.info("delete data count:" + saveModels.size());
            logger.info("delete flag:" + br.isSucceeded());
            logger.info("delete flag:" + !bulkResponse.hasFailures());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
    }
@ -244,7 +250,7 @@ public class MysqlToEsQuotaJob implements Job {
     */
    private Boolean saveDate(List<SaveModel> sms) {
        try {
            return SpringUtil.getBean(SaveHelper.class).save(sms);
            return SpringUtil.getBean(ElasticSearch7Helper.class).save(esIndex,sms);
        } catch (Exception e) {
            logger.error("save error:" + e.getMessage());
        }

+ 49 - 41
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/service/ExtractDataService.java

@ -1,12 +1,18 @@
package com.yihu.jw.statistics.service;
import com.alibaba.fastjson.JSON;
import com.yihu.jw.elasticsearch.ElasticSearch7Helper;
import com.yihu.jw.entity.quota.WlyyQuotaResult;
import com.yihu.jw.statistics.etl.save.es.ElasticFactory;
import com.yihu.jw.statistics.etl.save.es.ElastricSearchSave;
import com.yihu.jw.statistics.util.DateUtil;
import com.yihu.jw.statistics.vo.SaveModel;
import io.searchbox.client.JestClient;
import io.searchbox.core.*;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -20,7 +26,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@ -34,14 +42,16 @@ import java.util.List;
@Service
public class ExtractDataService {
    @Autowired
    private ElasticFactory elasticFactory;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    @Value("${es.type}")
    private String esType;
    @Value("${es.index}")
    private String esIndex;
    /*@Autowired
    private ElastricSearchSave elastricSearchSave;//只读库的jdbcTemplate*/
    @Autowired
    private ElastricSearchSave elastricSearchSave;//只读库的jdbcTemplate
    private ElasticSearch7Helper elasticSearch7Helper;
    private JdbcTemplate jdbcTemplate;
    /**
@ -58,7 +68,7 @@ public class ExtractDataService {
    private Logger logger = LoggerFactory.getLogger(ExtractDataService.class);
    public void extractOneDate(String date) {
    public void extractOneDate(String date) throws IOException {
        String sql = "select * from wlyy_quota_result w where w.level1_type=1  and w.quato_code !=18 and w.quato_code !=19 and quota_date='" + date + "' ";
        List<WlyyQuotaResult> quotaResults = jdbcTemplate.query(sql, new BeanPropertyRowMapper(WlyyQuotaResult.class));
        save2es(quotaResults, null, date);
@ -66,7 +76,7 @@ public class ExtractDataService {
    }
    public void extractOneDateWithId(String date, String quotaId) {
    public void extractOneDateWithId(String date, String quotaId) throws IOException {
        String sql = "select * from wlyy_quota_result w where w.level1_type=1  and w.quato_code !=18 and w.quato_code !=19 and quota_date='" + date + "' and quato_code='" + quotaId + "'";
        List<WlyyQuotaResult> quotaResults = jdbcTemplate.query(sql, new BeanPropertyRowMapper(WlyyQuotaResult.class));
        save2es(quotaResults, quotaId, date);
@ -123,9 +133,7 @@ public class ExtractDataService {
    }
    private void deleteData(Date quotaDate, String quotaCode) {
        JestClient jestClient = null;
        try {
            jestClient = elasticFactory.getJestClient();
            //先根据条件查找出来
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(
@ -134,62 +142,62 @@ public class ExtractDataService {
                            .must(QueryBuilders.matchQuery("quotaDate", quotaDate))
            ).size(50000);//一次取10000条
            Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(esIndex).addType(esType)
                    .build();
            SearchResult result = jestClient.execute(search);
            List<SaveModel> saveModels = result.getSourceAsObjectList(SaveModel.class);
            List<String> result = elasticSearch7Helper.search(esIndex,searchSourceBuilder);
            List<SaveModel> saveModels = new ArrayList<>();
            for (String s:result){
                SaveModel saveModel = new SaveModel();
                saveModel = JSON.parseObject(s,SaveModel.class);
                saveModels.add(saveModel);
            }
            //根据id批量删除
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout(TimeValue.timeValueSeconds(10));
            for (SaveModel obj : saveModels) {
                Delete index = new Delete.Builder(obj.getId()).build();
                bulk.addAction(index);
                bulkRequest.add(new DeleteRequest(esIndex)
                        .id(obj.getId()));
            }
            BulkResult br = jestClient.execute(bulk.build());
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            logger.info(bulkResponse.buildFailureMessage());
            logger.info("delete data count:" + saveModels.size());
            logger.info("delete flag:" +!bulkResponse.hasFailures());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
    }
    private void deleteData(Date quotaDate) {
        JestClient jestClient = null;
        try {
             jestClient = elasticFactory.getJestClient();
            //先根据条件查找出来
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(
                    new BoolQueryBuilder()
                            .must(QueryBuilders.matchQuery("quotaDate", quotaDate))
            ).size(500000);//一次取10000条
            Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(esIndex).addType(esType)
                    .build();
            SearchResult result = jestClient.execute(search);
            List<SaveModel> saveModels = result.getSourceAsObjectList(SaveModel.class);
            List<String> result = elasticSearch7Helper.search(esIndex,searchSourceBuilder);
            List<SaveModel> saveModels = new ArrayList<>();
            for (String s:result){
                SaveModel saveModel = new SaveModel();
                saveModel = JSON.parseObject(s,SaveModel.class);
                saveModels.add(saveModel);
            }
            //根据id批量删除
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(esIndex).defaultType(esType);
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.timeout(TimeValue.timeValueSeconds(10));
            for (SaveModel obj : saveModels) {
                Delete index = new Delete.Builder(obj.getId()).build();
                bulk.addAction(index);
                bulkRequest.add(new DeleteRequest(esIndex)
                        .id(obj.getId()));
            }
            BulkResult br = jestClient.execute(bulk.build());
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            logger.info(bulkResponse.buildFailureMessage());
            logger.info("delete data count:" + saveModels.size());
            logger.info("delete flag:" +!bulkResponse.hasFailures());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jestClient != null) {
                jestClient.shutdownClient();
            }
        }
    }
    private void save2es(List<WlyyQuotaResult> quotaResults, String quotaCode, String quotaDate) {
    private void save2es(List<WlyyQuotaResult> quotaResults, String quotaCode, String quotaDate) throws IOException {
        List<SaveModel> saveModel = new ArrayList<>();
        if (quotaResults != null && quotaResults.size() > 0) {
            //查询ES是否有这天的数据有就删除
@ -289,7 +297,7 @@ public class ExtractDataService {
                saveModel.add(saveModelTemp);
            });
            elastricSearchSave.save(saveModel);
            elasticSearch7Helper.save(esIndex,saveModel);
        }
    }
}

+ 109 - 12
svr/svr-statistics-hlw/src/main/java/com/yihu/jw/statistics/util/ElasticsearchUtil.java

@ -2,26 +2,43 @@ package com.yihu.jw.statistics.util;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.alibaba.druid.sql.parser.ParserException;
import com.alibaba.druid.sql.parser.SQLExprParser;
import com.yihu.jw.statistics.etl.save.es.ElasticFactory;
import com.alibaba.druid.sql.parser.Token;
import com.alibaba.fastjson.JSON;
import com.yihu.jw.statistics.vo.DataModel;
import com.yihu.jw.statistics.vo.SaveModel;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.nlpcn.es4sql.domain.Select;
import org.nlpcn.es4sql.domain.Where;
import org.nlpcn.es4sql.exception.SqlParseException;
import org.nlpcn.es4sql.jdbc.ObjectResult;
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
import org.nlpcn.es4sql.parse.SqlParser;
import org.nlpcn.es4sql.parse.WhereParser;
import org.nlpcn.es4sql.query.AggregationQueryAction;
import org.nlpcn.es4sql.query.DefaultQueryAction;
import org.nlpcn.es4sql.query.QueryAction;
import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
import org.nlpcn.es4sql.query.maker.QueryMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
@ -36,8 +53,8 @@ import java.util.*;
@Component
public class ElasticsearchUtil {
    private Logger logger= LoggerFactory.getLogger(ElasticsearchUtil.class);
    @Autowired
    private ElasticFactory elasticFactory;
    @Resource(name="restHighLevelClient")
    private RestHighLevelClient restHighLevelClient;
    @Value("${es.type}")
    private String esType;
    @Value("${es.index}")
@ -312,6 +329,80 @@ public class ElasticsearchUtil {
        return quotaDate + "T00:00:00+0800";
    }
    /**
     * 执行sql
     */
    public <T> List<T> executeSql(String sql,Class<T> clazz) throws Exception {
        //实例化查询请求对象
        SearchRequest request = new SearchRequest();
        //实例化SearchSourceBuilder
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        //根据索引、查询条件构建查询构造器
        BoolQueryBuilder boolQueryBuilder = createQueryBuilderBySql(sql);
        //将查询构造器注入SearchSourceBuilder
        searchBuilder.query(boolQueryBuilder);
        //设置请求查询的索引(查询构造器中已指定,无需重复设置)
        //request.indices(indexName);
        //将构建好的SearchSourceBuilder注入请求
        request.source(searchBuilder);
        //带入请求执行查询
        SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        //得到查询结果
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        List<T> listData = new ArrayList<>();
        //遍历查询结果
        for(SearchHit hit : searchHits){
            Map<String,Object> datas = hit.getSourceAsMap();
            T t1 = JSON.parseObject(JSON.toJSONString(datas),clazz);
            listData.add(t1);
            logger.info(datas.toString());
        }
        return listData;
    }
    /**
     * 构建查询构造器
     * @return
     */
    public BoolQueryBuilder createQueryBuilderBySql(String sql) {
        BoolQueryBuilder boolQuery = null;
        try {
            SQLExprParser parser = new ElasticSqlExprParser(sql);
            SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
            SqlParser sqlParser = new SqlParser();
            MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) sqlExpr.getSubQuery().getQuery();
            WhereParser whereParser = new WhereParser(sqlParser, query);
            Where where = whereParser.findWhere();
            if (where != null) {
                boolQuery = QueryMaker.explan(where);
            }
        } catch (SqlParseException e) {
            logger.info("ReadES.createQueryBuilderByExpress-Exception,"+e.getMessage());
            e.printStackTrace();
        }
        return boolQuery;
    }
    /**
     * 验证sql
     *
     * @param sql sql查询语句
     * @return and (a=1 and b=1) or (c=1 and d=1)
     */
    private SQLExpr toSqlExpr(String sql) {
        SQLExprParser parser = new ElasticSqlExprParser(sql);
        SQLExpr expr = parser.expr();
        if (parser.getLexer().token() != Token.EOF) {
            throw new ParserException("illegal sql expr : " + sql);
        }
        return expr;
    }
    /**
     * 执行sql查询es
     *
@ -336,11 +427,11 @@ public class ElasticsearchUtil {
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
                action = new AggregationQueryAction((Client) restHighLevelClient.getLowLevelClient(), select);
                requestBuilder = action.explain();
            } else {
                //封装成自己的Select对象
                Client client = elasticFactory.getTransportClient();
                Client client = (Client) restHighLevelClient.getLowLevelClient();
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
            }
@ -351,7 +442,7 @@ public class ElasticsearchUtil {
            }else{
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            temp.getLines().stream().forEach(one -> {
                try {
@ -411,15 +502,18 @@ public class ElasticsearchUtil {
            //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
            AggregationQueryAction action = null;
            DefaultQueryAction queryAction = null;
            QueryAction queryAction1 = null;
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
                action = new AggregationQueryAction((Client) restHighLevelClient.getLowLevelClient(), select);
                requestBuilder = action.explain();
                queryAction1 =action;
            } else {
                //封装成自己的Select对象
                queryAction = new DefaultQueryAction(elasticFactory.getTransportClient(), select);
                queryAction = new DefaultQueryAction((Client) restHighLevelClient.getLowLevelClient(), select);
                requestBuilder = queryAction.explain();
                queryAction1= queryAction;
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            Object queryResult = null;
@ -428,7 +522,7 @@ public class ElasticsearchUtil {
            } else {
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction1).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            temp.getLines().forEach(one -> {
                Object saveModel = null;
@ -533,16 +627,19 @@ public class ElasticsearchUtil {
            //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
            AggregationQueryAction action = null;
            DefaultQueryAction queryAction = null;
            QueryAction queryAction1 = null;
            SqlElasticSearchRequestBuilder requestBuilder = null;
            if (select.isAgg) {
                //包含计算的的排序分组的
                action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
                action = new AggregationQueryAction((Client) restHighLevelClient.getLowLevelClient(), select);
                requestBuilder = action.explain();
                queryAction1=action;
            } else {
                //封装成自己的Select对象
                Client client = elasticFactory.getTransportClient();
                Client client = (Client) restHighLevelClient.getLowLevelClient();
                queryAction = new DefaultQueryAction(client, select);
                requestBuilder = queryAction.explain();
                queryAction1=queryAction;
            }
            SearchResponse response = (SearchResponse) requestBuilder.get();
            Object queryResult = null;
@ -551,7 +648,7 @@ public class ElasticsearchUtil {
            }else{
                queryResult = response.getHits();
            }
            ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(queryResult, true);
            ObjectResult temp = new ObjectResultsExtractor(true, true, true,true,queryAction1).extractResults(queryResult, true);
            List<String> heads = temp.getHeaders();
            for(List<Object> one:temp.getLines()){
//            temp.getLines().stream().forEach(one -> {

+ 5 - 5
svr/svr-statistics-hlw/src/main/resources/application.yml

@ -506,7 +506,7 @@ es:
  index: hlw_quota_test
  type: hlw_quota_test
  host: http://172.26.0.55:9000
  tHost: 172.26.0.55:9300
  tHost: 172.26.0.55:9200
  clusterName: jkzl
  securityUser: lion:jkzlehr
  user: lion
@ -744,12 +744,12 @@ es:
  pwflag: 1 # 1需要密码,0不需要密码
  index: hlw_quota_test
  type: hlw_quota_test
  host: http://172.26.0.55:9000
  host: http://172.26.0.55:9200
  tHost: 172.26.0.55:9300
  clusterName: jkzl
  securityUser: lion:jkzlehr
  user: lion
  password: jkzlehr
  securityUser: elastic:elastic
  user: elastic
  password: elastic
wlyy:
  im:

+ 1 - 1
wlyy-parent-pom/pom.xml

@ -120,7 +120,7 @@
        <version.elasticsearch-sql>2.4.1.0</version.elasticsearch-sql>
        <version.data-elasticsearch>2.1.3.RELEASE</version.data-elasticsearch>
        <version.jest>2.4.0</version.jest>
        <version.druid>1.0.15</version.druid>
        <version.druid>1.1.21</version.druid>
        <version.jna>4.1.0</version.jna>
        <version.hbase-client>2.2.4</version.hbase-client>
        <version.spring-data-hadoop>2.2.0.RELEASE</version.spring-data-hadoop>