package com.yihu.jw.elasticsearch; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yihu.jw.elasticsearch.model.ESIDEntity; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * Utils - 主要基于(jestClient)的搜索工具 * Created by chenweida on 2017/6/2. */ @Component public class ElasticSearch7Helper { private Logger logger = LoggerFactory.getLogger(ElasticSearch7Helper.class); @Resource(name="restHighLevelClient") private RestHighLevelClient restHighLevelClient; public Boolean saveOne(String index, T t) throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(TimeValue.timeValueSeconds(60)); bulkRequest.add(new IndexRequest(index) // 不指定ID的话,新增时ID是随机的 // .id(items.get(i).getId().toString()) .source(JSON.toJSONString(t), XContentType.JSON) ); BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); logger.info(bulkResponse.buildFailureMessage()); return !bulkResponse.hasFailures(); } public Boolean save(String index, List sources) throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(TimeValue.timeValueSeconds(60));//超时时间 for (int i = 0; i < sources.size(); i++) { bulkRequest.add(new IndexRequest(index) // 不指定ID的话,新增时ID是随机的 // .id(items.get(i).getId().toString()) .source(JSON.toJSONString(sources.get(i)), XContentType.JSON) ); // bulkRequest.add(UpdateRequest) 批量更新 // bulkRequest.add(DeleteRequest) 批量删除 } BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); logger.info(bulkResponse.buildFailureMessage()); return !bulkResponse.hasFailures(); } public RestStatus save (String index, String source) throws IOException { IndexRequest indexRequest = new IndexRequest(index); indexRequest.timeout(TimeValue.timeValueSeconds(10)); indexRequest.source(source, XContentType.JSON); IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); return indexResponse.status(); } /** * 自定义ID * @param index * @param source * @param idFieldString * @return */ public RestStatus saveWithCustomId (String index, String source, String idFieldString) throws IOException { IndexRequest indexRequest = new IndexRequest(index); indexRequest.timeout(TimeValue.timeValueSeconds(10)); indexRequest.id(idFieldString).source(source, XContentType.JSON); IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); return indexResponse.status(); } /** * 自定义ID * * @param index * @param sources * @param idFieldString * @return */ public Boolean saveBulkWithCustomId (String index, List sources, String idFieldString) throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(TimeValue.timeValueSeconds(10)); for (int i = 0; i < sources.size(); i++) { JSONObject jsonObject = JSON.parseObject(sources.get(i)); bulkRequest.add(new IndexRequest(index) .id(jsonObject.getString(idFieldString)) .source(JSON.toJSONString(sources.get(i)), XContentType.JSON) ); } BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); logger.info(bulkResponse.buildFailureMessage()); return !bulkResponse.hasFailures(); } /** * 更新 * @param index * @param sources * @return */ public Boolean update (String index, List sources) throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(TimeValue.timeValueSeconds(60)); for (int i = 0; i < sources.size(); i++) { bulkRequest.add(new UpdateRequest(index,((ESIDEntity) sources.get(i)).getId()) .doc(JSON.toJSONString(sources.get(i)), XContentType.JSON) ); } BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); logger.info(bulkResponse.buildFailureMessage()); return !bulkResponse.hasFailures(); } /** * 更新 * @param index * @param type * @param list * @return */ public Boolean updateByMap(String index, String type, List> list) throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(TimeValue.timeValueSeconds(10)); for (int i = 0; i < list.size(); i++) { Map map = list.get(i); bulkRequest.add(new UpdateRequest(index,String.valueOf(map.get("id"))) .doc(JSON.toJSONString(map), XContentType.JSON) ); } BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); logger.info(bulkResponse.buildFailureMessage()); return !bulkResponse.hasFailures(); } /** * 更新 * @param index * @param _id * @param source * @return */ public boolean update(String index, String _id, JSONObject source) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, _id); updateRequest.timeout(TimeValue.timeValueSeconds(10)); updateRequest.doc(source.toJSONString(), XContentType.JSON); UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); logger.debug("update info: " + updateResponse.status()); return true; } /** * 删除 */ public boolean delete(String index, List> datas) throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout(TimeValue.timeValueSeconds(60)); for (Map map : datas) { bulkRequest.add(new DeleteRequest(index) .id(map.get("_id").toString())); } BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); logger.info(bulkResponse.buildFailureMessage()); return !bulkResponse.hasFailures(); } public T searchOne(String index, SearchSourceBuilder queryStr,Class beanClass) throws Exception { SearchRequest request = new SearchRequest().indices(index).source(queryStr); //带入请求执行查询 SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT); //得到查询结果 SearchHits hits = searchResponse.getHits(); SearchHit[] searchHits = hits.getHits(); if(searchHits.length>0){ Map datas = hits.getAt(0).getSourceAsMap(); T t = beanClass.newInstance(); BeanUtils.copyProperties(t,datas); return t; } return null; } public List search(String index, SearchSourceBuilder queryStr,Class beanClass) throws Exception { SearchRequest request = new SearchRequest().indices(index).source(queryStr); //带入请求执行查询 SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT); //得到查询结果 SearchHits hits = searchResponse.getHits(); SearchHit[] searchHits = hits.getHits(); List listData = new ArrayList<>(); //遍历查询结果 for(SearchHit hit : searchHits){ Map datas = hit.getSourceAsMap(); T t = beanClass.newInstance(); BeanUtils.copyProperties(t,datas); listData.add(t); logger.info(datas.toString()); } return listData; } public List search(String index, SearchSourceBuilder queryStr) throws Exception { SearchRequest request = new SearchRequest().indices(index).source(queryStr); //带入请求执行查询 SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT); //得到查询结果 SearchHits hits = searchResponse.getHits(); SearchHit[] searchHits = hits.getHits(); List listData = new ArrayList<>(); //遍历查询结果 for(SearchHit hit : searchHits){ String datas = hit.getSourceAsString(); String id = hit.getId(); JSONObject object = JSONObject.parseObject(datas); object.put("id",id); listData.add(object.toJSONString()); } return listData; } public int esCount(String index, SearchSourceBuilder queryStr) throws Exception{ SearchRequest request = new SearchRequest().indices(index).source(queryStr); //带入请求执行查询 SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT); //得到查询结果 SearchHits hits = searchResponse.getHits(); SearchHit[] searchHits = hits.getHits(); return searchHits.length; } public String search(String index, String id) throws IOException { GetRequest request = new GetRequest(index, id.toString()); GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT); return getResponse.getSourceAsString(); } public static Object mapToObject(Map map, Class beanClass) throws Exception { if (map == null) return null; Object obj = beanClass.newInstance(); Field[] fields = obj.getClass().getDeclaredFields(); for (Field field : fields) { int mod = field.getModifiers(); if(Modifier.isStatic(mod) || Modifier.isFinal(mod)){ continue; } field.setAccessible(true); field.set(obj, map.get(field.getName())); } return obj; } }