LAPTOP-KB9HII50\70708 2 years ago
parent
commit
974dff6fef

+ 68 - 0
starter/elasticsearch-starter/pom.xml

@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>wlyy-starter</artifactId>
        <groupId>com.yihu.jw</groupId>
        <version>2.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>elasticsearch-starter</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${version.elasticsearch}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${version.elasticsearch}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${version.elasticsearch}</version>
        </dependency>
        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>7.17.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${version.druid}</version>
        </dependency>
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
        </dependency>
        <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
            <version>${version.elasticsearch-sql}</version>
        </dependency>
<!--        <dependency>
            <groupId>com.sun.jna</groupId>
            <artifactId>jna</artifactId>
        </dependency>-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${version.fastjson}</version>
        </dependency>
    </dependencies>
</project>

+ 13 - 0
starter/elasticsearch-starter/readme.md

@ -0,0 +1,13 @@
1. 该公共包提供了transport和jest的ES服务接口
2. 如果只需要transport请使用elasticsearch-transport-starter
#配置示例
spring:
  elasticsearch:
    cluster-name: jkzl #默认即为elasticsearch  集群名
    cluster-nodes: 172.19.103.45:9300,172.19.103.68:9300 #配置es节点信息,逗号分隔,如果没有指定,则启动ClientNode
    client-transport-sniff: false #是否开启嗅探
    jest:
      uris: http://172.19.103.45:9200,http://172.19.103.68:9200 #jest的uri
      connection-timeout: 60000 #Connection timeout in milliseconds.
      multi-threaded: true #Enable connection requests from multiple execution threads.

+ 196 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Helper.java

@ -0,0 +1,196 @@
package com.yihu.jw.elasticsearch;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yihu.jw.elasticsearch.model.ESIDEntity;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
/**
 * Utils - 主要基于(jestClient)的搜索工具
 * Created by chenweida on 2017/6/2.
 */
@Component
public class ElasticSearch7Helper {
    private Logger logger = LoggerFactory.getLogger(ElasticSearch7Helper.class);
    @Resource(name="esClient")
    private RestHighLevelClient restHighLevelClient;
    public <T> Boolean save(String index, List<T> sources) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (int i = 0; i < sources.size(); i++) {
            bulkRequest.add(new IndexRequest(index)
                            // 不指定ID的话,新增时ID是随机的
//                    .id(items.get(i).getId().toString())
                            .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
            );
            // bulkRequest.add(UpdateRequest)   批量更新
            // bulkRequest.add(DeleteRequest)   批量删除
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        logger.info(bulkResponse.buildFailureMessage());
        return !bulkResponse.hasFailures();
    }
    public RestStatus save (String index, String source) throws IOException {
        IndexRequest indexRequest = new IndexRequest(index);
        indexRequest.timeout(TimeValue.timeValueSeconds(1));
        indexRequest.source(source, XContentType.JSON);
        IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        return indexResponse.status();
    }
    /**
     * 自定义ID
     * @param index
     * @param source
     * @param idFieldString
     * @return
     */
    public RestStatus saveWithCustomId (String index, String source, String idFieldString) throws IOException {
        IndexRequest indexRequest = new IndexRequest(index);
        indexRequest.timeout(TimeValue.timeValueSeconds(1));
        indexRequest.id(idFieldString).source(source, XContentType.JSON);
        IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        return indexResponse.status();
    }
    /**
     * 自定义ID
     *
     * @param index
     * @param sources
     * @param idFieldString
     * @return
     */
    public Boolean saveBulkWithCustomId (String index, List<String> sources, String idFieldString) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (int i = 0; i < sources.size(); i++) {
            JSONObject jsonObject = JSON.parseObject(sources.get(i));
            bulkRequest.add(new IndexRequest(index)
                    .id(jsonObject.getString(idFieldString))
                    .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
            );
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        logger.info(bulkResponse.buildFailureMessage());
        return !bulkResponse.hasFailures();
    }
    /**
     * 更新
     * @param index
     * @param sources
     * @return
     */
    public <T> Boolean update (String index, List<T> sources) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (int i = 0; i < sources.size(); i++) {
            bulkRequest.add(new UpdateRequest(index,((ESIDEntity) sources.get(i)).getId())
                            .doc(JSON.toJSONString(sources.get(i)), XContentType.JSON)
            );
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        logger.info(bulkResponse.buildFailureMessage());
        return !bulkResponse.hasFailures();
    }
    /**
     * 更新
     * @param index
     * @param type
     * @param list
     * @return
     */
    public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (int i = 0; i < list.size(); i++) {
            Map map = list.get(i);
            bulkRequest.add(new UpdateRequest(index,String.valueOf(map.get("id")))
                    .doc(JSON.toJSONString(map), XContentType.JSON)
            );
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        logger.info(bulkResponse.buildFailureMessage());
        return !bulkResponse.hasFailures();
    }
    /**
     * 更新
     * @param index
     * @param _id
     * @param source
     * @return
     */
    public boolean update(String index, String _id, JSONObject source) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(index, _id);
        updateRequest.timeout(TimeValue.timeValueSeconds(1));
        updateRequest.doc(source.toJSONString(), XContentType.JSON);
        UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
        logger.debug("update info: " + updateResponse.status());
        return true;
    }
    /**
     * 删除
     */
    public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (Map map : datas) {
            bulkRequest.add(new DeleteRequest(index)
                    .id(map.get("_id").toString()));
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        logger.info(bulkResponse.buildFailureMessage());
        return !bulkResponse.hasFailures();
    }
//    public SearchResult search(String index, String type, String queryStr) throws IOException {
//        Search search = ((new Search.Builder(queryStr)).addIndex(index)).addType(type).build();
//        SearchResult result = jestClient.execute(search);
//        this.logger.info("search data count: " + result.getTotal());
//        return result;
//    }
    public String search(String index, String id) throws IOException {
        GetRequest request = new GetRequest(index, id.toString());
        GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
        return getResponse.getSourceAsString();
    }
}

+ 330 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Pool.java

@ -0,0 +1,330 @@
package com.yihu.jw.elasticsearch;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
 * Created by yeshijie on 2022/6/15.
 */
@Component
public class ElasticSearch7Pool {
    @Value("${ElasticSearch.Hosts}")
    private String hosts;
    @Value("${ElasticSearch.UserName}")
    private String userName;
    @Value("${ElasticSearch.Password}")
    private String password;
    @SuppressWarnings("deprecation")
    @Bean(name="esClient")
    public RestHighLevelClient getClient() {
        String[] hosts = this.hosts.split(",");
        HttpHost[] httpHosts = new HttpHost[hosts.length];
        for(int i=0;i<hosts.length;i++) {
            httpHosts[i] = new HttpHost(hosts[i], 9200, "http");
        }
        //设置密码
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
        //设置超时
        RestClientBuilder builder = RestClient.builder(httpHosts).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                requestConfigBuilder.setConnectTimeout(-1);
                requestConfigBuilder.setSocketTimeout(-1);
                requestConfigBuilder.setConnectionRequestTimeout(-1);
                return requestConfigBuilder;
            }
        }).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.disableAuthCaching();
                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
//        }).setMaxRetryTimeoutMillis(5*60*1000);
        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }
    /**
     * 创建索引
     * @throws IOException
     */
    public void testCreateIndex(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        //1. 创建索引请求
        CreateIndexRequest request = new CreateIndexRequest(index);
        //2. 客户端执行请求 IndicesClient,请求后获得响应
        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
        //输出是否创建成功
        System.out.println(response.isAcknowledged());
    }
    /**
     * 查询是否存在索引
     * @throws IOException
     */
    public void testExistIndex(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        GetIndexRequest request = new GetIndexRequest(index);
        boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        System.out.println(exists);
    }
    /**
     * 删除索引
     */
    public void testDeleteIndex(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        //创建删除索引请求对象
        DeleteIndexRequest request = new DeleteIndexRequest(index);
        //客户端执行请求
        AcknowledgedResponse response = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
        //输出是否删除成功
        System.out.println(response.isAcknowledged());
    }
    /**
     * 添加文档
     */
    public void addDocument(RestHighLevelClient restHighLevelClient,String index) throws IOException {
//        //获取对象
//        Goods goods = goodsDao.findById(12991976L);
//        //创建请求
//        IndexRequest request = new IndexRequest(index);
//        //规则 put /siyi/_doc/1
//        request.id("1");
//        request.timeout(TimeValue.timeValueSeconds(1));
//        //将我们的数据放入请求
//        request.source(JSON.toJSONString(goods), XContentType.JSON);
//        //客户端发送请求
//        IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
//        //IndexResponse[index=siyi,type=_doc,id=1,version=1,result=created,seqNo=0,primaryTerm=1,shards={"total":2,"successful":1,"failed":0}]
//        System.out.println(response);
//        //响应状态 CREATED
//        System.out.println(response.status());
    }
    /**
     * 删除文档
     * @throws IOException
     */
    public void testDeleteDocument(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        DeleteRequest request = new DeleteRequest(index,"1");
        request.timeout("1s");
        DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        //DeleteResponse[index=siyi,type=_doc,id=1,version=3,result=deleted,shards=ShardInfo{total=2, successful=1, failures=[]}]
        System.out.println(response);
        //OK
        System.out.println(response.status());
    }
    //3.3 修改文档
    public void testUpdateDocument(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        UpdateRequest request = new UpdateRequest(index,"1");
        request.timeout("1s");
//        Goods goods = goodsDao.findById(12991976L);
//        goods.setTitle("xxxx");
//        request.doc(JSON.toJSONString(goods),XContentType.JSON);
        UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        //UpdateResponse[index=siyi,type=_doc,id=1,version=2,seqNo=1,primaryTerm=1,result=updated,shards=ShardInfo{total=2, successful=1, failures=[]}]
        System.out.println(response);
        System.out.println(response.status());
    }
    /**
     * 批量新增文档
     */
    public void testBulkRequest(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        BulkRequest request = new BulkRequest();
        request.timeout("1s");
//        List<Goods> list = goodsDao.findAll();
//
//        list.forEach(goods -> {
//            //批量更新或者删除只需修改方法即可
//            request.add(new IndexRequest(index)
//                    .id(""+goods.getId())
//                    .source(JSON.toJSONString(goods),XContentType.JSON));
//        });
        BulkResponse responses = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        System.out.println(responses);
        //是否删除失败 返回false代表删除成功 false
        System.out.println(responses.hasFailures());
    }
    /**
     *  当我们需要使用批量增删改数据时,只需要使用BulkRequest的add,delete,update中相应方法即可。
     * 获取文档,判断是否存在 get /index/doc/1
     */
    public void testIsExists(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        GetRequest request = new GetRequest(index,"1");
        //不获取返回的 _source的上下文了
        request.fetchSourceContext(new FetchSourceContext(false));
        request.storedFields("_none");
        boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
        System.out.println(exists);
    }
    /**
     * 获取文档信息
     * @throws IOException
     */
    public void testGetDocument(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        GetRequest request = new GetRequest(index,"1");
        GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
        System.out.println(response.getSourceAsString());
        System.out.println(response);
    }
    /**
     * 复杂查询
     */
    public void testSearch(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        SearchRequest request = new SearchRequest(index);
        //构建搜索条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        //查询条件,我们可以使用QueryBuilders工具来实现
        //QueryBuilders.matchAllQuery() 匹配所有
        //QueryBuilders.termQuery() 精确匹配
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "java");
        builder.query(termQueryBuilder);
        builder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        System.out.println(JSON.toJSONString(response.getHits()));
        System.out.println("----------------------");
        for (SearchHit hit : response.getHits().getHits()) {
            System.out.println(hit.getSourceAsMap());
        }
    }
    /**
     *     当然,我们可以向直接操作ElasticSearch一样去构建查询条件。
     *
     *     在java中有对应的类来实现我们需要构造的查询条件。
     * 分页查询
     */
    public void testSearch2(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        SearchRequest request = new SearchRequest(index);
        //构建搜索条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        //查询条件,我们可以使用QueryBuilders工具来实现
        //QueryBuilders.matchAllQuery() 匹配所有
        //QueryBuilders.termQuery() 精确匹配
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "java");
        builder.query(termQueryBuilder);
        builder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        //分页查询,第0页 每页30
        builder.from(0);
        builder.size(30);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        System.out.println(JSON.toJSONString(response.getHits()));
        System.out.println("----------------------");
        for (SearchHit hit : response.getHits().getHits()) {
            System.out.println(hit.getSourceAsMap());
        }
    }
    /**
     * 查询结果高亮显示
     */
    public void testSearch1(RestHighLevelClient restHighLevelClient,String index) throws IOException {
        SearchRequest request = new SearchRequest(index);
        //构建搜索条件
        SearchSourceBuilder builder = new SearchSourceBuilder();
        //查询条件,我们可以使用QueryBuilders工具来实现
        //QueryBuilders.matchAllQuery() 匹配所有
        //QueryBuilders.termQuery() 精确匹配
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "java");
        builder.query(termQueryBuilder);
        builder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        //分页查询,第0页 每页30
        builder.from(0);
        builder.size(30);
        //设置高亮
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        //高亮显示的字段
        highlightBuilder.field("title");
        highlightBuilder.preTags("<em>");
        highlightBuilder.postTags("</em>");
        builder.highlighter(highlightBuilder);
        request.source(builder);
        SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        System.out.println(JSON.toJSONString(response.getHits()));
        System.out.println("----------------------");
        for (SearchHit hit : response.getHits().getHits()) {
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            System.out.println(sourceAsMap);
            //输出高亮情况
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            HighlightField highlightField = highlightFields.get("title");
            if (highlightField == null) continue;
            Text[] fragments = highlightField.getFragments();
            String title = "";
            for (Text fragment : fragments) {
                title += fragment;
            }
            //使用有高亮的结果替换原本的结果,
            sourceAsMap.put("title", title);
            System.out.println(sourceAsMap);
        }
    }
}

+ 363 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/ElasticSearch7Util.java

@ -0,0 +1,363 @@
package com.yihu.jw.elasticsearch;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.alibaba.druid.sql.parser.ParserException;
import com.alibaba.druid.sql.parser.SQLExprParser;
import com.alibaba.druid.sql.parser.Token;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.nlpcn.es4sql.domain.Select;
import org.nlpcn.es4sql.domain.Where;
import org.nlpcn.es4sql.exception.SqlParseException;
import org.nlpcn.es4sql.jdbc.ObjectResult;
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
import org.nlpcn.es4sql.parse.SqlParser;
import org.nlpcn.es4sql.parse.WhereParser;
import org.nlpcn.es4sql.query.AggregationQueryAction;
import org.nlpcn.es4sql.query.DefaultQueryAction;
import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
import org.nlpcn.es4sql.query.maker.QueryMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.*;
/**
 * elasticsearch7.x版本数据获取
 * Created by yeshijie on 2022/6/15.
 */
@Service
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class ElasticSearch7Util {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearch7Util.class);
    @Resource(name="esClient")
    private RestHighLevelClient restHighLevelClient;
    /**
     * 执行sql
     * @param sql
     * @return
     */
    public Integer executeCountSQL(String sql) throws Exception {
        //实例化查询请求对象
        SearchRequest request = new SearchRequest();
        //实例化SearchSourceBuilder
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        //根据索引、查询条件构建查询构造器
        BoolQueryBuilder boolQueryBuilder = createQueryBuilderBySql(sql);
        //将查询构造器注入SearchSourceBuilder
        searchBuilder.query(boolQueryBuilder);
        //设置请求查询的索引(查询构造器中已指定,无需重复设置)
        //request.indices(indexName);
        //将构建好的SearchSourceBuilder注入请求
        request.source(searchBuilder);
        //带入请求执行查询
        SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        //得到查询结果
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        //遍历查询结果
        for(SearchHit hit : searchHits){
            Map<String,Object> datas = hit.getSourceAsMap();
            for (Object o:datas.values()){
                if (o instanceof Double){
                    Double valueTemp =Double.valueOf((Double)o);
                    DecimalFormat df = new DecimalFormat("######0");
                    return Integer.parseInt(df.format(valueTemp));
                }
            }
        }
        return 0;
    }
    /**
     * 执行sql
     */
    public List<Map<String,Object>> executeSQL(String sql) throws IOException {
        //实例化查询请求对象
        SearchRequest request = new SearchRequest();
        //实例化SearchSourceBuilder
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        //根据索引、查询条件构建查询构造器
        BoolQueryBuilder boolQueryBuilder = createQueryBuilderBySql(sql);
        //将查询构造器注入SearchSourceBuilder
        searchBuilder.query(boolQueryBuilder);
        //设置请求查询的索引(查询构造器中已指定,无需重复设置)
        //request.indices(indexName);
        //将构建好的SearchSourceBuilder注入请求
        request.source(searchBuilder);
        //带入请求执行查询
        SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        //得到查询结果
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        List<Map<String,Object>> listData = new ArrayList<>();
        //遍历查询结果
        for(SearchHit hit : searchHits){
            Map<String,Object> datas = hit.getSourceAsMap();
            listData.add(datas);
            logger.info(datas.toString());
        }
        return listData;
    }
    /**
     * 构建查询构造器
     * @param indexName  索引名
     * @param whereExpress  查询条件:(f1=2 and f2=1) or (f3=1 and f4=1)
     * @return
     */
    public BoolQueryBuilder createQueryBuilderByWhere(String indexName, String whereExpress) {
        BoolQueryBuilder boolQuery = null;
        try {
            String sql = "select * from " + indexName;
            String whereTemp = "";
            if (StringUtils.isNotBlank(whereExpress)) {
                whereTemp = " where 1=1 " + whereExpress;
            }
            SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql + whereTemp);
            SqlParser sqlParser = new SqlParser();
            MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) sqlExpr.getSubQuery().getQuery();
            WhereParser whereParser = new WhereParser(sqlParser, query);
            Where where = whereParser.findWhere();
            if (where != null) {
                boolQuery = QueryMaker.explan(where);
            }
        } catch (SqlParseException e) {
            logger.info("ReadES.createQueryBuilderByExpress-Exception,"+e.getMessage());
            e.printStackTrace();
        }
        return boolQuery;
    }
    /**
     * 构建查询构造器
     * @return
     */
    public BoolQueryBuilder createQueryBuilderBySql(String sql) {
        BoolQueryBuilder boolQuery = null;
        try {
            SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
            SqlParser sqlParser = new SqlParser();
            MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) sqlExpr.getSubQuery().getQuery();
            WhereParser whereParser = new WhereParser(sqlParser, query);
            Where where = whereParser.findWhere();
            if (where != null) {
                boolQuery = QueryMaker.explan(where);
            }
        } catch (SqlParseException e) {
            logger.info("ReadES.createQueryBuilderByExpress-Exception,"+e.getMessage());
            e.printStackTrace();
        }
        return boolQuery;
    }
    /**
     * 验证sql
     *
     * @param sql sql查询语句
     * @return and (a=1 and b=1) or (c=1 and d=1)
     */
    private SQLExpr toSqlExpr(String sql) {
        SQLExprParser parser = new ElasticSqlExprParser(sql);
        SQLExpr expr = parser.expr();
        if (parser.getLexer().token() != Token.EOF) {
            throw new ParserException("illegal sql expr : " + sql);
        }
        return expr;
    }
    /**
     * 查询指定索引下的信息
     * @param indexName 索引名称
     * @param condition 查询条件
     *  queryIndexContent("user_site"," and phone_no in('12234567890') ");
     */
    public List<Map<String,Object>> queryIndexContent(String indexName, String condition) throws IOException {
        //实例化查询请求对象
        SearchRequest request = new SearchRequest();
        //实例化SearchSourceBuilder
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        //根据索引、查询条件构建查询构造器
        BoolQueryBuilder boolQueryBuilder = createQueryBuilderByWhere(indexName,condition);
        //将查询构造器注入SearchSourceBuilder
        searchBuilder.query(boolQueryBuilder);
        //设置请求查询的索引(查询构造器中已指定,无需重复设置)
        //request.indices(indexName);
        //将构建好的SearchSourceBuilder注入请求
        request.source(searchBuilder);
        //带入请求执行查询
        SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
        //得到查询结果
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        List<Map<String,Object>> listData = new ArrayList<>();
        //遍历查询结果
        for(SearchHit hit : searchHits){
            Map<String,Object> datas = hit.getSourceAsMap();
            listData.add(datas);
            logger.info(datas.toString());
        }
        return listData;
    }
    /**
     * 查询语句转换
     * @param filters
     * @return
     */
    public QueryBuilder getQueryBuilder(String filters) {
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (StringUtils.isEmpty(filters)) {
            return boolQueryBuilder;
        }
        String [] filterArr = filters.split(";");
        for (String filter : filterArr) {
            if (filter.contains("||")){
                String [] fields = filter.split("\\|\\|");
                BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                for (String filed : fields) {
                    String [] condition = filed.split("=");
                    queryBuilder.should(QueryBuilders.termQuery(condition[0], condition[1]));
                }
                boolQueryBuilder.must(queryBuilder);
            } else if (filter.contains("?")) {
                String [] condition = filter.split("\\?");
                MatchPhraseQueryBuilder matchPhraseQueryBuilder = QueryBuilders.matchPhraseQuery(condition[0], condition[1]);
                boolQueryBuilder.must(matchPhraseQueryBuilder);
            } else if (filter.contains("<>")) {
                String [] condition = filter.split("<>");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.mustNot(termsQueryBuilder);
                } else {
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.mustNot(termQueryBuilder);
                }
            } else if (filter.contains(">=")) {
                String [] condition = filter.split(">=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains(">")) {
                String [] condition = filter.split(">");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<=")) {
                String [] condition = filter.split("<=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<")) {
                String [] condition = filter.split("<");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("=")) {
                String [] condition = filter.split("=");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.must(termsQueryBuilder);
                } else {
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.must(termQueryBuilder);
                }
            }
        }
        return boolQueryBuilder;
    }
    /**
     * 多条件查询
     * @param mustMap
     * @param length
     * @return
     */
    public List<String> multiSearch(Map<String,Object> mustMap,int length,String index) {
        // 根据多个条件 生成 boolQueryBuilder
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 循环添加多个条件
        for (Map.Entry<String, Object> entry : mustMap.entrySet()) {
            boolQueryBuilder.must(QueryBuilders
                    .matchQuery(entry.getKey(), entry.getValue()));
        }
        // boolQueryBuilder生效
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQueryBuilder);
        searchSourceBuilder.size(length);
        // 其中listSearchResult是自己编写的方法,以供多中查询方式使用。
        return listSearchResult(searchSourceBuilder,index);
    }
    /**
     * 用来处理搜索结果,转换成链表
     * @param builder
     * @return
     */
    public List<String> listSearchResult(SearchSourceBuilder builder,String index) {
        // 提交查询
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.source(builder);
        // 获得response
        SearchResponse searchResponse = null;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 从response中获得结果
        List<String> list = new LinkedList<>();
        SearchHits hits = searchResponse.getHits();
        Iterator<SearchHit> iterator = hits.iterator();
        while(iterator.hasNext()) {
            SearchHit next = iterator.next();
            list.add(next.getSourceAsString());
        }
        return list;
    }
}

+ 62 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/TestEs7.java

@ -0,0 +1,62 @@
package com.yihu.jw.elasticsearch;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
/**
 * Created by yeshijie on 2022/6/15.
 */
public class TestEs7 {
    public static void main(String[] args) throws Exception{
        String host = "172.26.0.56";
        String userName = "elastic";
        String password = "elastic";
        String[] hosts = host.split(",");
        HttpHost[] httpHosts = new HttpHost[hosts.length];
        for(int i=0;i<hosts.length;i++) {
            httpHosts[i] = new HttpHost(hosts[i], 9200, "http");
        }
        //设置密码
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
        //设置超时
        RestClientBuilder builder = RestClient.builder(httpHosts).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                requestConfigBuilder.setConnectTimeout(-1);
                requestConfigBuilder.setSocketTimeout(-1);
                requestConfigBuilder.setConnectionRequestTimeout(-1);
                return requestConfigBuilder;
            }
        }).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.disableAuthCaching();
                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
//        }).setMaxRetryTimeoutMillis(5*60*1000);
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
        String index = "body_health_data";
        GetIndexRequest request = new GetIndexRequest(index);
        boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        System.out.println(exists);
    }
}

+ 63 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/config/ElasticSearchConfig.java

@ -0,0 +1,63 @@
package com.yihu.jw.elasticsearch.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
 * Config - ES TransportClient 配置
 * @author progr1mmer
 * @date Created on 2017/12/1.
 */
//@ConfigurationProperties(prefix = "spring.elasticsearch")
@Configuration
public class ElasticSearchConfig {
    /**
     * 集群名称
     */
    private String clusterName;
    /**
     * 节点
     */
    private String clusterNodes;
    /**
     * 是否开启嗅探
     */
    private boolean clientTransportSniff;
    public String getClusterName() {
        return clusterName;
    }
    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }
    public String getClusterNodes() {
        return clusterNodes;
    }
    public void setClusterNodes(String clusterNodes) {
        this.clusterNodes = clusterNodes;
    }
    public boolean isClientTransportSniff() {
        return clientTransportSniff;
    }
    public void setClientTransportSniff(boolean clientTransportSniff) {
        this.clientTransportSniff = clientTransportSniff;
    }
    @PostConstruct
    private void configInfo() {
        StringBuilder info = new StringBuilder("{");
        info.append("\n  elasticsearch.cluster-name = " + clusterName);
        info.append("\n  elasticsearch.cluster-nodes = " + clusterNodes);
        info.append("\n  elasticsearch.client-transport-sniff = " + clientTransportSniff);
        info.append("\n}");
        System.out.println("Elasticsearch.configInfo : " + info.toString());
    }
}

+ 130 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/impl/DefaultHignLevelDocumentHandler.java

@ -0,0 +1,130 @@
package com.yihu.jw.elasticsearch.impl;
import com.alibaba.fastjson.JSON;
import jdk.nashorn.internal.objects.annotations.Getter;
import jdk.nashorn.internal.objects.annotations.Setter;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
/**
 * Created by yeshijie on 2022/6/15.
 */
public class DefaultHignLevelDocumentHandler<T, ID> implements HignLevelDocumentHandler<T, ID> {
    @Autowired
    @Qualifier("restHighLevelClient")
    private RestHighLevelClient client;
    public void save(T t,String indexName) throws Exception {
        if (judgeId(t)) {
            Field id = t.getClass().getDeclaredField("id");
            save(t, id.toString());
            return;
        }
        save(t,null, indexName);
    }
    public void save(T t, String id,String indexName) throws IOException {
        jsonSave(t, id, TimeValue.timeValueSeconds(1),indexName);
    }
    public void jsonSave(T t, String id, TimeValue timeValue,String indexName) throws IOException {
        IndexRequest indexRequest = new IndexRequest(indexName);
        indexRequest.id(id);
        indexRequest.timeout(timeValue);
        indexRequest.source(JSON.toJSONString(t), XContentType.JSON);
        client.index(indexRequest, RequestOptions.DEFAULT);
    }
    public String detail(ID id,String indexName) throws IOException {
        GetRequest request = new GetRequest(indexName, id.toString());
        GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
        return getResponse.getSourceAsString();
    }
    public void update(T t,String indexName) throws Exception {
        if (judgeId(t)) {
            Field id = t.getClass().getDeclaredField("id");
            save(t, id.toString());
        } else {
            update(t, null,indexName);
        }
    }
    public void update(T t, String id,String indexName) throws Exception {
        update(t, id, TimeValue.timeValueSeconds(1),indexName);
    }
    public void update(T t, String id, TimeValue timeValue,String indexName) throws Exception {
        UpdateRequest updateRequest = new UpdateRequest(indexName, id);
        updateRequest.timeout(timeValue);
        updateRequest.doc(JSON.toJSONString(t), XContentType.JSON);
        client.update(updateRequest, RequestOptions.DEFAULT);
    }
    public void delete(ID id,String indexName) throws IOException {
        delete(id, TimeValue.timeValueSeconds(1),indexName);
    }
    public void delete(ID id, String timeout,String indexName) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(indexName);
        deleteRequest.timeout(timeout);
        deleteRequest.id(id.toString());
        client.delete(deleteRequest, RequestOptions.DEFAULT);
    }
    public void delete(ID id, TimeValue timeValue,String indexName) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(indexName);
        deleteRequest.timeout(timeValue);
        deleteRequest.id(id.toString());
        client.delete(deleteRequest, RequestOptions.DEFAULT);
    }
    private boolean judgeId(T t){
        //获取这个类的所有属性
        Field[] fields = t.getClass().getDeclaredFields();
        boolean flag = false;
        //循环遍历所有的fields
        for (int i = 0; i < fields.length; i++) {
            if (fields[i].getName().equals("id")) {
                flag = true;
                break;
            }
        }
        return flag;
    }
    public String bulk(List<T> list,String indexName) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (int i = 0; i < list.size(); i++) {
            bulkRequest.add(new IndexRequest(indexName)
                    // 不指定ID的话,新增时ID是随机的
//                    .id(items.get(i).getId().toString())
                    .source(JSON.toJSONString(list.get(i)), XContentType.JSON)
            );
            // bulkRequest.add(UpdateRequest)   批量更新
            // bulkRequest.add(DeleteRequest)   批量删除
        }
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        return "bulk insert OK";
    }
}

+ 7 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/impl/HignLevelDocumentHandler.java

@ -0,0 +1,7 @@
package com.yihu.jw.elasticsearch.impl;
/**
 * Created by yeshijie on 2022/6/15.
 */
public interface HignLevelDocumentHandler<T, ID> {
}

+ 21 - 0
starter/elasticsearch-starter/src/main/java/com/yihu/jw/elasticsearch/model/ESIDEntity.java

@ -0,0 +1,21 @@
package com.yihu.jw.elasticsearch.model;
import io.searchbox.annotations.JestId;
/**
 * Model - es保存model的公共父类
 * Created by chenweida on 2017/11/3.
 */
public class ESIDEntity {
    @JestId
    private String id;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}