|
@ -1,14 +1,13 @@
|
|
|
package com.yihu.jw.elasticsearch;
|
|
|
|
|
|
import com.alibaba.druid.pool.DruidDataSource;
|
|
|
import com.alibaba.druid.sql.ast.SQLExpr;
|
|
|
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
|
|
|
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
|
|
|
import com.alibaba.druid.sql.parser.ParserException;
|
|
|
import com.alibaba.druid.sql.parser.SQLExprParser;
|
|
|
import com.alibaba.druid.sql.parser.Token;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import io.searchbox.core.Update;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
@ -26,8 +25,6 @@ import org.elasticsearch.action.update.UpdateResponse;
|
|
|
import org.elasticsearch.client.RequestOptions;
|
|
|
import org.elasticsearch.client.RestHighLevelClient;
|
|
|
import org.elasticsearch.client.indices.CreateIndexRequest;
|
|
|
import org.elasticsearch.client.transport.TransportClient;
|
|
|
import org.elasticsearch.common.xcontent.ToXContent;
|
|
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
@ -50,13 +47,9 @@ import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
import org.elasticsearch.search.sort.FieldSortBuilder;
|
|
|
import org.elasticsearch.search.sort.SortBuilder;
|
|
|
import org.elasticsearch.search.sort.SortBuilders;
|
|
|
import org.elasticsearch.search.sort.SortOrder;
|
|
|
import org.nlpcn.es4sql.domain.Select;
|
|
|
import org.nlpcn.es4sql.domain.Where;
|
|
|
import org.nlpcn.es4sql.exception.SqlParseException;
|
|
|
import org.nlpcn.es4sql.jdbc.ObjectResult;
|
|
|
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
|
|
|
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
|
|
|
import org.nlpcn.es4sql.parse.SqlParser;
|
|
|
import org.nlpcn.es4sql.parse.WhereParser;
|
|
@ -74,8 +67,6 @@ import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.io.IOException;
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.PreparedStatement;
|
|
|
import java.sql.ResultSet;
|
|
|
import java.text.DecimalFormat;
|
|
|
import java.util.*;
|
|
@ -90,7 +81,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ElasticSearch7Util.class);
|
|
|
|
|
|
@Resource(name="restHighLevelClient")
|
|
|
@Resource(name = "restHighLevelClient")
|
|
|
private RestHighLevelClient restHighLevelClient;
|
|
|
|
|
|
@Autowired
|
|
@ -101,6 +92,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 执行sql
|
|
|
*
|
|
|
* @param sql
|
|
|
* @return
|
|
|
*/
|
|
@ -125,11 +117,11 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
SearchHit[] searchHits = hits.getHits();
|
|
|
//遍历查询结果
|
|
|
for(SearchHit hit : searchHits){
|
|
|
Map<String,Object> datas = hit.getSourceAsMap();
|
|
|
for (Object o:datas.values()){
|
|
|
if (o instanceof Double){
|
|
|
Double valueTemp =Double.valueOf((Double)o);
|
|
|
for (SearchHit hit : searchHits) {
|
|
|
Map<String, Object> datas = hit.getSourceAsMap();
|
|
|
for (Object o : datas.values()) {
|
|
|
if (o instanceof Double) {
|
|
|
Double valueTemp = Double.valueOf((Double) o);
|
|
|
DecimalFormat df = new DecimalFormat("######0");
|
|
|
return Long.parseLong(df.format(valueTemp));
|
|
|
}
|
|
@ -138,20 +130,20 @@ public class ElasticSearch7Util {
|
|
|
return 0L;
|
|
|
}
|
|
|
|
|
|
public List<Map<String,Object>> executeSQLStream(String sql) throws Exception {
|
|
|
public List<Map<String, Object>> executeSQLStream(String sql) throws Exception {
|
|
|
ResultSet resultSet = elasticSearch7Pool.restHighLevelClientStream(sql);
|
|
|
return resultSetUtil.resultToMapList(resultSet);
|
|
|
}
|
|
|
|
|
|
public <T> List<T> executeSQLStreamDO(String sql,Class<T> target) throws Exception {
|
|
|
public <T> List<T> executeSQLStreamDO(String sql, Class<T> target) throws Exception {
|
|
|
ResultSet resultSet = elasticSearch7Pool.restHighLevelClientStream(sql);
|
|
|
return resultSetUtil.resultToEntity(resultSet,target);
|
|
|
return resultSetUtil.resultToEntity(resultSet, target);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 执行sql
|
|
|
*/
|
|
|
public List<Map<String,Object>> executeSQL(String sql) throws IOException {
|
|
|
public List<Map<String, Object>> executeSQL(String sql) throws IOException {
|
|
|
//实例化查询请求对象
|
|
|
SearchRequest request = new SearchRequest();
|
|
|
//实例化SearchSourceBuilder
|
|
@ -159,7 +151,7 @@ public class ElasticSearch7Util {
|
|
|
//根据索引、查询条件构建查询构造器
|
|
|
BoolQueryBuilder boolQueryBuilder = createQueryBuilderBySql(sql);
|
|
|
//将查询构造器注入SearchSourceBuilder
|
|
|
searchBuilder.query(boolQueryBuilder).sort("quotaDate",SortOrder.DESC).sort("createTime",SortOrder.DESC);
|
|
|
searchBuilder.query(boolQueryBuilder).sort("quotaDate", SortOrder.DESC).sort("createTime", SortOrder.DESC);
|
|
|
//设置请求查询的索引(查询构造器中已指定,无需重复设置)
|
|
|
//request.indices(indexName);
|
|
|
//将构建好的SearchSourceBuilder注入请求
|
|
@ -171,10 +163,10 @@ public class ElasticSearch7Util {
|
|
|
SearchHits hits = searchResponse.getHits();
|
|
|
|
|
|
SearchHit[] searchHits = hits.getHits();
|
|
|
List<Map<String,Object>> listData = new ArrayList<>();
|
|
|
List<Map<String, Object>> listData = new ArrayList<>();
|
|
|
//遍历查询结果
|
|
|
for(SearchHit hit : searchHits){
|
|
|
Map<String,Object> datas = hit.getSourceAsMap();
|
|
|
for (SearchHit hit : searchHits) {
|
|
|
Map<String, Object> datas = hit.getSourceAsMap();
|
|
|
listData.add(datas);
|
|
|
logger.info(datas.toString());
|
|
|
}
|
|
@ -185,7 +177,7 @@ public class ElasticSearch7Util {
|
|
|
/**
|
|
|
* 执行sql
|
|
|
*/
|
|
|
public <T> List<T> executeSql(String sql,Class<T> clazz) throws Exception {
|
|
|
public <T> List<T> executeSql(String sql, Class<T> clazz) throws Exception {
|
|
|
//实例化查询请求对象
|
|
|
SearchRequest request = new SearchRequest();
|
|
|
//实例化SearchSourceBuilder
|
|
@ -207,10 +199,10 @@ public class ElasticSearch7Util {
|
|
|
SearchHit[] searchHits = hits.getHits();
|
|
|
List<T> listData = new ArrayList<>();
|
|
|
//遍历查询结果
|
|
|
for(SearchHit hit : searchHits){
|
|
|
Map<String,Object> datas = hit.getSourceAsMap();
|
|
|
for (SearchHit hit : searchHits) {
|
|
|
Map<String, Object> datas = hit.getSourceAsMap();
|
|
|
T t = clazz.newInstance();
|
|
|
BeanUtils.copyProperties(t,datas);
|
|
|
BeanUtils.copyProperties(t, datas);
|
|
|
listData.add(t);
|
|
|
logger.info(datas.toString());
|
|
|
}
|
|
@ -220,8 +212,9 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 构建查询构造器
|
|
|
* @param indexName 索引名
|
|
|
* @param whereExpress 查询条件:(f1=2 and f2=1) or (f3=1 and f4=1)
|
|
|
*
|
|
|
* @param indexName 索引名
|
|
|
* @param whereExpress 查询条件:(f1=2 and f2=1) or (f3=1 and f4=1)
|
|
|
* @return
|
|
|
*/
|
|
|
public BoolQueryBuilder createQueryBuilderByWhere(String indexName, String whereExpress) {
|
|
@ -242,7 +235,7 @@ public class ElasticSearch7Util {
|
|
|
boolQuery = QueryMaker.explan(where);
|
|
|
}
|
|
|
} catch (SqlParseException e) {
|
|
|
logger.info("ReadES.createQueryBuilderByExpress-Exception,"+e.getMessage());
|
|
|
logger.info("ReadES.createQueryBuilderByExpress-Exception," + e.getMessage());
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
return boolQuery;
|
|
@ -250,6 +243,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 构建查询构造器
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
public BoolQueryBuilder createQueryBuilderBySql(String sql) {
|
|
@ -266,7 +260,7 @@ public class ElasticSearch7Util {
|
|
|
boolQuery = QueryMaker.explan(where);
|
|
|
}
|
|
|
} catch (SqlParseException e) {
|
|
|
logger.info("ReadES.createQueryBuilderByExpress-Exception,"+e.getMessage());
|
|
|
logger.info("ReadES.createQueryBuilderByExpress-Exception," + e.getMessage());
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
return boolQuery;
|
|
@ -290,17 +284,18 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 查询指定索引下的信息
|
|
|
*
|
|
|
* @param indexName 索引名称
|
|
|
* @param condition 查询条件
|
|
|
* queryIndexContent("user_site"," and phone_no in('12234567890') ");
|
|
|
* queryIndexContent("user_site"," and phone_no in('12234567890') ");
|
|
|
*/
|
|
|
public List<Map<String,Object>> queryIndexContent(String indexName, String condition) throws IOException {
|
|
|
public List<Map<String, Object>> queryIndexContent(String indexName, String condition) throws IOException {
|
|
|
//实例化查询请求对象
|
|
|
SearchRequest request = new SearchRequest();
|
|
|
//实例化SearchSourceBuilder
|
|
|
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
|
|
|
//根据索引、查询条件构建查询构造器
|
|
|
BoolQueryBuilder boolQueryBuilder = createQueryBuilderByWhere(indexName,condition);
|
|
|
BoolQueryBuilder boolQueryBuilder = createQueryBuilderByWhere(indexName, condition);
|
|
|
//将查询构造器注入SearchSourceBuilder
|
|
|
searchBuilder.query(boolQueryBuilder);
|
|
|
//设置请求查询的索引(查询构造器中已指定,无需重复设置)
|
|
@ -314,10 +309,10 @@ public class ElasticSearch7Util {
|
|
|
SearchHits hits = searchResponse.getHits();
|
|
|
|
|
|
SearchHit[] searchHits = hits.getHits();
|
|
|
List<Map<String,Object>> listData = new ArrayList<>();
|
|
|
List<Map<String, Object>> listData = new ArrayList<>();
|
|
|
//遍历查询结果
|
|
|
for(SearchHit hit : searchHits){
|
|
|
Map<String,Object> datas = hit.getSourceAsMap();
|
|
|
for (SearchHit hit : searchHits) {
|
|
|
Map<String, Object> datas = hit.getSourceAsMap();
|
|
|
listData.add(datas);
|
|
|
logger.info(datas.toString());
|
|
|
}
|
|
@ -327,6 +322,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 查询语句转换
|
|
|
*
|
|
|
* @param filters
|
|
|
* @return
|
|
|
*/
|
|
@ -335,24 +331,24 @@ public class ElasticSearch7Util {
|
|
|
if (StringUtils.isEmpty(filters)) {
|
|
|
return boolQueryBuilder;
|
|
|
}
|
|
|
String [] filterArr = filters.split(";");
|
|
|
String[] filterArr = filters.split(";");
|
|
|
for (String filter : filterArr) {
|
|
|
if (filter.contains("||")){
|
|
|
String [] fields = filter.split("\\|\\|");
|
|
|
if (filter.contains("||")) {
|
|
|
String[] fields = filter.split("\\|\\|");
|
|
|
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
|
|
for (String filed : fields) {
|
|
|
String [] condition = filed.split("=");
|
|
|
String[] condition = filed.split("=");
|
|
|
queryBuilder.should(QueryBuilders.termQuery(condition[0], condition[1]));
|
|
|
}
|
|
|
boolQueryBuilder.must(queryBuilder);
|
|
|
} else if (filter.contains("?")) {
|
|
|
String [] condition = filter.split("\\?");
|
|
|
String[] condition = filter.split("\\?");
|
|
|
MatchPhraseQueryBuilder matchPhraseQueryBuilder = QueryBuilders.matchPhraseQuery(condition[0], condition[1]);
|
|
|
boolQueryBuilder.must(matchPhraseQueryBuilder);
|
|
|
} else if (filter.contains("<>")) {
|
|
|
String [] condition = filter.split("<>");
|
|
|
String[] condition = filter.split("<>");
|
|
|
if (condition[1].contains(",")) {
|
|
|
String [] inCondition = condition[1].split(",");
|
|
|
String[] inCondition = condition[1].split(",");
|
|
|
TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
|
|
|
boolQueryBuilder.mustNot(termsQueryBuilder);
|
|
|
} else {
|
|
@ -360,29 +356,29 @@ public class ElasticSearch7Util {
|
|
|
boolQueryBuilder.mustNot(termQueryBuilder);
|
|
|
}
|
|
|
} else if (filter.contains(">=")) {
|
|
|
String [] condition = filter.split(">=");
|
|
|
String[] condition = filter.split(">=");
|
|
|
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
|
|
|
rangeQueryBuilder.gte(condition[1]);
|
|
|
boolQueryBuilder.must(rangeQueryBuilder);
|
|
|
} else if (filter.contains(">")) {
|
|
|
String [] condition = filter.split(">");
|
|
|
String[] condition = filter.split(">");
|
|
|
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
|
|
|
rangeQueryBuilder.gt(condition[1]);
|
|
|
boolQueryBuilder.must(rangeQueryBuilder);
|
|
|
} else if (filter.contains("<=")) {
|
|
|
String [] condition = filter.split("<=");
|
|
|
String[] condition = filter.split("<=");
|
|
|
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
|
|
|
rangeQueryBuilder.lte(condition[1]);
|
|
|
boolQueryBuilder.must(rangeQueryBuilder);
|
|
|
} else if (filter.contains("<")) {
|
|
|
String [] condition = filter.split("<");
|
|
|
String[] condition = filter.split("<");
|
|
|
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
|
|
|
rangeQueryBuilder.lt(condition[1]);
|
|
|
boolQueryBuilder.must(rangeQueryBuilder);
|
|
|
} else if (filter.contains("=")) {
|
|
|
String [] condition = filter.split("=");
|
|
|
String[] condition = filter.split("=");
|
|
|
if (condition[1].contains(",")) {
|
|
|
String [] inCondition = condition[1].split(",");
|
|
|
String[] inCondition = condition[1].split(",");
|
|
|
TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
|
|
|
boolQueryBuilder.must(termsQueryBuilder);
|
|
|
} else {
|
|
@ -396,11 +392,12 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 多条件查询
|
|
|
*
|
|
|
* @param mustMap
|
|
|
* @param length
|
|
|
* @return
|
|
|
*/
|
|
|
public List<String> multiSearch(Map<String,Object> mustMap,int length,String index) {
|
|
|
public List<String> multiSearch(Map<String, Object> mustMap, int length, String index) {
|
|
|
// 根据多个条件 生成 boolQueryBuilder
|
|
|
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
|
|
|
|
@ -416,15 +413,16 @@ public class ElasticSearch7Util {
|
|
|
searchSourceBuilder.size(length);
|
|
|
|
|
|
// 其中listSearchResult是自己编写的方法,以供多中查询方式使用。
|
|
|
return listSearchResult(searchSourceBuilder,index);
|
|
|
return listSearchResult(searchSourceBuilder, index);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 用来处理搜索结果,转换成链表
|
|
|
*
|
|
|
* @param builder
|
|
|
* @return
|
|
|
*/
|
|
|
public List<String> listSearchResult(SearchSourceBuilder builder,String index) {
|
|
|
public List<String> listSearchResult(SearchSourceBuilder builder, String index) {
|
|
|
// 提交查询
|
|
|
SearchRequest searchRequest = new SearchRequest(index);
|
|
|
searchRequest.source(builder);
|
|
@ -441,7 +439,7 @@ public class ElasticSearch7Util {
|
|
|
List<String> list = new LinkedList<>();
|
|
|
SearchHits hits = searchResponse.getHits();
|
|
|
Iterator<SearchHit> iterator = hits.iterator();
|
|
|
while(iterator.hasNext()) {
|
|
|
while (iterator.hasNext()) {
|
|
|
SearchHit next = iterator.next();
|
|
|
list.add(next.getSourceAsString());
|
|
|
}
|
|
@ -449,20 +447,20 @@ public class ElasticSearch7Util {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @param boolQueryBuilder 查询参数 build
|
|
|
* @param sortName 排序字段名称
|
|
|
* @param boolQueryBuilder 查询参数 build
|
|
|
* @param sortName 排序字段名称
|
|
|
* @return
|
|
|
*/
|
|
|
public List<Map<String, Object>> queryList(String index,BoolQueryBuilder boolQueryBuilder,String sortName,Integer size) throws IOException {
|
|
|
public List<Map<String, Object>> queryList(String index, BoolQueryBuilder boolQueryBuilder, String sortName, Integer size) throws IOException {
|
|
|
//实例化查询请求对象
|
|
|
SearchRequest request = new SearchRequest();
|
|
|
//实例化SearchSourceBuilder
|
|
|
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
|
|
|
//将查询构造器注入SearchSourceBuilder
|
|
|
if (StringUtils.isNoneBlank(sortName)){
|
|
|
if (StringUtils.isNoneBlank(sortName)) {
|
|
|
searchBuilder.query(boolQueryBuilder).sort(sortName, SortOrder.DESC);
|
|
|
}
|
|
|
if (size!=null){
|
|
|
if (size != null) {
|
|
|
searchBuilder.size(size);
|
|
|
}
|
|
|
//设置请求查询的索引(查询构造器中已指定,无需重复设置)
|
|
@ -477,10 +475,10 @@ public class ElasticSearch7Util {
|
|
|
SearchHits hits = searchResponse.getHits();
|
|
|
|
|
|
SearchHit[] searchHits = hits.getHits();
|
|
|
List<Map<String,Object>> listData = new ArrayList<>();
|
|
|
List<Map<String, Object>> listData = new ArrayList<>();
|
|
|
//遍历查询结果
|
|
|
for(SearchHit hit : searchHits){
|
|
|
Map<String,Object> datas = hit.getSourceAsMap();
|
|
|
for (SearchHit hit : searchHits) {
|
|
|
Map<String, Object> datas = hit.getSourceAsMap();
|
|
|
listData.add(datas);
|
|
|
logger.info(datas.toString());
|
|
|
}
|
|
@ -510,16 +508,17 @@ public class ElasticSearch7Util {
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
/**
|
|
|
* 创建映射
|
|
|
* 注意:保存数据之前如果没有创建相应的字
|
|
|
* 段映射会导致搜索结果不准确
|
|
|
* 注意:保存数据之前如果没有创建相应的字
|
|
|
* 段映射会导致搜索结果不准确
|
|
|
*
|
|
|
* @param index
|
|
|
* @param source
|
|
|
* @param setting - 该设置根据需要进行配置
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void mapping (String index, Map<String, Map<String, String>> source, Map<String, Object> setting) throws IOException{
|
|
|
public void mapping(String index, Map<String, Map<String, String>> source, Map<String, Object> setting) throws IOException {
|
|
|
RestHighLevelClient restHighLevelClient1 = elasticSearch7Pool.restHighLevelClient();
|
|
|
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties");
|
|
|
for (String field : source.keySet()) {
|
|
@ -548,8 +547,9 @@ public class ElasticSearch7Util {
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
/**
|
|
|
* 添加数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param source
|
|
|
* @return
|
|
@ -560,7 +560,8 @@ public class ElasticSearch7Util {
|
|
|
if (StringUtils.isEmpty(_id)) {
|
|
|
IndexRequest request = new IndexRequest(index);
|
|
|
request.source(source);
|
|
|
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);;
|
|
|
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
|
|
|
;
|
|
|
source.put("_id", response.getId());
|
|
|
} else {
|
|
|
UpdateRequest request = new UpdateRequest();
|
|
@ -573,12 +574,13 @@ public class ElasticSearch7Util {
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
/**
|
|
|
* 批量删除数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param idArr
|
|
|
*/
|
|
|
public void bulkDelete (String index, String [] idArr) throws IOException {
|
|
|
public void bulkDelete(String index, String[] idArr) throws IOException {
|
|
|
if (idArr.length > 0) {
|
|
|
BulkRequest bulkRequest = new BulkRequest();
|
|
|
for (String id : idArr) {
|
|
@ -601,15 +603,16 @@ public class ElasticSearch7Util {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
/**
|
|
|
* 根据条件批量删除数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param queryBuilder
|
|
|
*/
|
|
|
public void deleteByFilter(String index, QueryBuilder queryBuilder) throws IOException {
|
|
|
public void deleteByFilter(String index, QueryBuilder queryBuilder) throws IOException {
|
|
|
long count = count(index, queryBuilder);
|
|
|
long page = count/10000 == 0 ? 1 :count/10000 +1;
|
|
|
for (long i =0;i<page;i++) {
|
|
|
long page = count / 10000 == 0 ? 1 : count / 10000 + 1;
|
|
|
for (long i = 0; i < page; i++) {
|
|
|
List<String> idList = getIds(index, queryBuilder);
|
|
|
if (idList.size() > 0) {
|
|
|
String[] idArr = new String[idList.size()];
|
|
@ -638,6 +641,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 根据字段批量删除数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param field
|
|
@ -649,6 +653,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 根据条件批量删除数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param filters
|
|
@ -658,7 +663,7 @@ public class ElasticSearch7Util {
|
|
|
deleteByFilter(index, queryBuilder);
|
|
|
}
|
|
|
|
|
|
/* *//**
|
|
|
/* *//**
|
|
|
* 根据条件批量删除数据
|
|
|
* @param index
|
|
|
* @param type
|
|
@ -696,41 +701,45 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 更新数据 - 返回最新文档
|
|
|
*
|
|
|
* @param index
|
|
|
* @param id
|
|
|
* @param source
|
|
|
* @return
|
|
|
* @throws DocumentMissingException
|
|
|
*/
|
|
|
public Map<String, Object> update(String index, String id, Map<String, Object> source) throws DocumentMissingException, IOException {
|
|
|
public Map<String, Object> update(String index, String id, Map<String, Object> source) throws DocumentMissingException, IOException {
|
|
|
source.remove("_id");
|
|
|
UpdateRequest request = new UpdateRequest();
|
|
|
request.index(index).id("_id");
|
|
|
request.doc(JSONObject.toJSONString(source), XContentType.JSON);
|
|
|
request.retryOnConflict(5);
|
|
|
System.out.println("request==>" + JSON.toJSONString(request));
|
|
|
UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
|
|
|
return findById(index, id);
|
|
|
return findById(index, id);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 更新数据 - 不返回文档
|
|
|
*
|
|
|
* @param index
|
|
|
* @param id
|
|
|
* @param source
|
|
|
* @throws DocumentMissingException
|
|
|
*/
|
|
|
public void voidUpdate (String index, String id, Map<String, Object> source) throws IOException {
|
|
|
public void voidUpdate(String index, String id, Map<String, Object> source) throws IOException {
|
|
|
source.remove("_id");
|
|
|
UpdateRequest request = new UpdateRequest();
|
|
|
request.index(index);
|
|
|
request.id(id);
|
|
|
request.doc(source);
|
|
|
request.retryOnConflict(5);
|
|
|
UpdateResponse response = restHighLevelClient.update(request,RequestOptions.DEFAULT);
|
|
|
UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 批量更新数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param source
|
|
|
* @throws DocumentMissingException
|
|
@ -739,7 +748,7 @@ public class ElasticSearch7Util {
|
|
|
if (source.size() > 0) {
|
|
|
BulkRequest bulkRequest = new BulkRequest();
|
|
|
source.forEach(item -> {
|
|
|
String _id = (String)item.remove("_id");
|
|
|
String _id = (String) item.remove("_id");
|
|
|
if (!org.springframework.util.StringUtils.isEmpty(_id)) {
|
|
|
UpdateRequest request = new UpdateRequest();
|
|
|
request.index(index);
|
|
@ -747,12 +756,13 @@ public class ElasticSearch7Util {
|
|
|
bulkRequest.add(request);
|
|
|
}
|
|
|
});
|
|
|
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
|
|
|
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据ID查找数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param id
|
|
|
* @return
|
|
@ -770,6 +780,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 根据字段查找数据
|
|
|
*
|
|
|
* @param index
|
|
|
* @param field
|
|
|
* @param value
|
|
@ -781,28 +792,31 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 获取文档列表
|
|
|
*
|
|
|
* @param index
|
|
|
* @param filters
|
|
|
* @return
|
|
|
*/
|
|
|
public List<Map<String, Object>> list(String index, String filters) throws IOException {
|
|
|
public List<Map<String, Object>> list(String index, String filters) throws IOException {
|
|
|
QueryBuilder queryBuilder = getQueryBuilder(filters);
|
|
|
System.out.println("queryBuilder==>" + JSON.toJSONString(queryBuilder));
|
|
|
return list(index, queryBuilder);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取文档列表
|
|
|
*
|
|
|
* @param index
|
|
|
* @param queryBuilder
|
|
|
* @return
|
|
|
*/
|
|
|
public List<Map<String, Object>> list(String index, QueryBuilder queryBuilder) throws IOException {
|
|
|
int size = (int)count(index, queryBuilder);
|
|
|
public List<Map<String, Object>> list(String index, QueryBuilder queryBuilder) throws IOException {
|
|
|
int size = (int) count(index, queryBuilder);
|
|
|
SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, 0, size);
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
|
|
|
request.source(builder);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
SearchHits hits = response.getHits();
|
|
|
List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
|
|
|
for (SearchHit hit : hits.getHits()) {
|
|
@ -815,6 +829,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 获取文档分页
|
|
|
*
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param filters
|
|
@ -828,6 +843,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 获取文档分页
|
|
|
*
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param filters
|
|
@ -842,6 +858,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 获取分档分页 - 带分页功能
|
|
|
*
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param filters
|
|
@ -858,6 +875,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 获取分档分页 - 带分页功能
|
|
|
*
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param queryBuilder
|
|
@ -871,7 +889,7 @@ public class ElasticSearch7Util {
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
|
|
|
request.source(builder);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
SearchHits hits = response.getHits();
|
|
|
List<Map<String, Object>> resultList = new ArrayList<>();
|
|
|
for (SearchHit hit : hits.getHits()) {
|
|
@ -879,36 +897,37 @@ public class ElasticSearch7Util {
|
|
|
source.put("_id", hit.getId());
|
|
|
resultList.add(source);
|
|
|
}
|
|
|
PageRequest pageRequest = PageRequest.of(page-1,size);
|
|
|
PageRequest pageRequest = PageRequest.of(page - 1, size);
|
|
|
return new PageImpl<>(resultList, pageRequest, hits.getTotalHits().value);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取ID列表
|
|
|
*
|
|
|
* @param index
|
|
|
* @param filters
|
|
|
* @return
|
|
|
*/
|
|
|
public List<String> getIds (String index, String filters) throws IOException {
|
|
|
public List<String> getIds(String index, String filters) throws IOException {
|
|
|
QueryBuilder queryBuilder = getQueryBuilder(filters);
|
|
|
return getIds(index, queryBuilder);
|
|
|
return getIds(index, queryBuilder);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取ID列表
|
|
|
*
|
|
|
* @param index
|
|
|
* @param queryBuilder
|
|
|
* 最多只能一万条
|
|
|
* @param queryBuilder 最多只能一万条
|
|
|
* @return
|
|
|
*/
|
|
|
public List<String> getIds (String index, QueryBuilder queryBuilder) throws IOException {
|
|
|
int size = (int)count(index,queryBuilder);
|
|
|
size = size > 10000 ? 10000:size;
|
|
|
public List<String> getIds(String index, QueryBuilder queryBuilder) throws IOException {
|
|
|
int size = (int) count(index, queryBuilder);
|
|
|
size = size > 10000 ? 10000 : size;
|
|
|
SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, 0, size);
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
|
|
|
request.source(builder);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
SearchHits hits = response.getHits();
|
|
|
List<String> resultList = new ArrayList<>();
|
|
|
for (SearchHit hit : hits.getHits()) {
|
|
@ -919,6 +938,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 获取文档数
|
|
|
*
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param filters
|
|
@ -926,11 +946,12 @@ public class ElasticSearch7Util {
|
|
|
*/
|
|
|
public long count(String index, String type, String filters) throws IOException {
|
|
|
QueryBuilder queryBuilder = getQueryBuilder(filters);
|
|
|
return count(index,queryBuilder);
|
|
|
return count(index, queryBuilder);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取文档数
|
|
|
*
|
|
|
* @param index
|
|
|
* @param queryBuilder
|
|
|
* @return
|
|
@ -940,17 +961,14 @@ public class ElasticSearch7Util {
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
request.searchType(SearchType.DFS_QUERY_THEN_FETCH);
|
|
|
request.source(builder);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
return response.getHits().getTotalHits().value;
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 根据日期分组
|
|
|
*
|
|
|
* @param index
|
|
|
* @param filters
|
|
|
* @param start
|
|
@ -974,7 +992,7 @@ public class ElasticSearch7Util {
|
|
|
builder.aggregation(dateHistogramBuilder);
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
request.source(builder);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
Histogram histogram = response.getAggregations().get(index + "-" + field);
|
|
|
Map<String, Long> temp = new HashMap<>();
|
|
|
histogram.getBuckets().forEach(item -> temp.put(item.getKeyAsString(), item.getDocCount()));
|
|
@ -983,6 +1001,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 查询去重数量
|
|
|
*
|
|
|
* @param index
|
|
|
* @param filters
|
|
|
* @param filed
|
|
@ -995,13 +1014,14 @@ public class ElasticSearch7Util {
|
|
|
builder.aggregation(cardinality);
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
request.source(builder);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
InternalCardinality internalCard = response.getAggregations().get("cardinality");
|
|
|
return new Double(internalCard.getProperty("value").toString()).intValue();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 分组统计
|
|
|
*
|
|
|
* @param index
|
|
|
* @param filters
|
|
|
* @param groupField
|
|
@ -1013,7 +1033,7 @@ public class ElasticSearch7Util {
|
|
|
AbstractAggregationBuilder aggregation = AggregationBuilders.terms("count").field(groupField);
|
|
|
builder.aggregation(aggregation);
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
Terms terms = response.getAggregations().get("count");
|
|
|
List<Terms.Bucket> buckets = (List<Terms.Bucket>) terms.getBuckets();
|
|
|
Map<String, Long> groupMap = new HashMap<>();
|
|
@ -1026,6 +1046,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 分组求和
|
|
|
*
|
|
|
* @param index
|
|
|
* @param filters
|
|
|
* @param sumField
|
|
@ -1034,17 +1055,17 @@ public class ElasticSearch7Util {
|
|
|
*/
|
|
|
public Map<String, Double> sumByGroup(String index, String filters, String sumField, String groupField) throws IOException {
|
|
|
QueryBuilder queryBuilder = getQueryBuilder(filters);
|
|
|
SearchSourceBuilder builder = SearchSourceBuilder( queryBuilder, null, null, null);
|
|
|
SearchSourceBuilder builder = SearchSourceBuilder(queryBuilder, null, null, null);
|
|
|
TermsAggregationBuilder aggregation = AggregationBuilders.terms("sum_query").field(groupField);
|
|
|
SumAggregationBuilder sumBuilder= AggregationBuilders.sum("sum_row").field(sumField);
|
|
|
SumAggregationBuilder sumBuilder = AggregationBuilders.sum("sum_row").field(sumField);
|
|
|
aggregation.subAggregation(sumBuilder);
|
|
|
builder.aggregation(aggregation);
|
|
|
SearchRequest request = new SearchRequest(index);
|
|
|
SearchResponse response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
|
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
Terms terms = response.getAggregations().get("sum_query");
|
|
|
List<Terms.Bucket> buckets = (List<Terms.Bucket>) terms.getBuckets();
|
|
|
Map<String, Double> groupMap = new HashMap<>();
|
|
|
for (Terms.Bucket bucket : buckets){
|
|
|
for (Terms.Bucket bucket : buckets) {
|
|
|
Sum sum2 = bucket.getAggregations().get("sum_row");
|
|
|
groupMap.put(bucket.getKey().toString(), sum2.getValue());
|
|
|
}
|
|
@ -1053,6 +1074,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 获取基础请求生成器
|
|
|
*
|
|
|
* @param queryBuilder
|
|
|
* @param sortBuilders
|
|
|
* @return
|
|
@ -1075,6 +1097,7 @@ public class ElasticSearch7Util {
|
|
|
|
|
|
/**
|
|
|
* 排序语句转换
|
|
|
*
|
|
|
* @param sorts
|
|
|
* @return
|
|
|
*/
|
|
@ -1083,7 +1106,7 @@ public class ElasticSearch7Util {
|
|
|
if (org.springframework.util.StringUtils.isEmpty(sorts)) {
|
|
|
return sortBuilderList;
|
|
|
}
|
|
|
String [] sortArr = sorts.split(";");
|
|
|
String[] sortArr = sorts.split(";");
|
|
|
for (String sort : sortArr) {
|
|
|
String operator = sort.substring(0, 1);
|
|
|
SortBuilder sortBuilder = new FieldSortBuilder(sort.substring(1));
|
|
@ -1100,8 +1123,9 @@ public class ElasticSearch7Util {
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
/**
|
|
|
* 根据SQL查找数据
|
|
|
*
|
|
|
* @param field
|
|
|
* @param sql
|
|
|
* @return
|
|
@ -1121,7 +1145,7 @@ public class ElasticSearch7Util {
|
|
|
boolQuery = QueryMaker.explan(where);
|
|
|
}
|
|
|
} catch (SqlParseException e) {
|
|
|
logger.info("ReadES.createQueryBuilderByExpress-Exception,"+e.getMessage());
|
|
|
logger.info("ReadES.createQueryBuilderByExpress-Exception," + e.getMessage());
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
//实例化查询请求对象
|
|
@ -1142,18 +1166,18 @@ public class ElasticSearch7Util {
|
|
|
SearchHits hits = searchResponse.getHits();
|
|
|
|
|
|
SearchHit[] searchHits = hits.getHits();
|
|
|
List<Map<String,Object>> listData = new ArrayList<>();
|
|
|
List<Map<String, Object>> listData = new ArrayList<>();
|
|
|
//遍历查询结果
|
|
|
for(SearchHit hit : searchHits){
|
|
|
Map<String,Object> datas = hit.getSourceAsMap();
|
|
|
Map<String,Object> result = new HashMap<>();
|
|
|
for (SearchHit hit : searchHits) {
|
|
|
Map<String, Object> datas = hit.getSourceAsMap();
|
|
|
Map<String, Object> result = new HashMap<>();
|
|
|
for (String _field : field) {
|
|
|
result.put(_field, datas.get(_field));
|
|
|
}
|
|
|
listData.add(result);
|
|
|
logger.info(result.toString());
|
|
|
}
|
|
|
return listData;
|
|
|
return listData;
|
|
|
}
|
|
|
|
|
|
|