|
- package com.yihu.jw.elasticsearch;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.yihu.jw.elasticsearch.model.ESIDEntity;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.unit.TimeValue;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.SearchHits;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.BeanUtils;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.lang.reflect.Field;
- import java.lang.reflect.Modifier;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- /**
- * Utils - 主要基于(jestClient)的搜索工具
- * Created by chenweida on 2017/6/2.
- */
- @Component
- public class ElasticSearch7Helper {
- private Logger logger = LoggerFactory.getLogger(ElasticSearch7Helper.class);
- @Resource(name="restHighLevelClient")
- private RestHighLevelClient restHighLevelClient;
- public <T> Boolean saveOne(String index, T t) throws IOException {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout(TimeValue.timeValueSeconds(60));
- bulkRequest.add(new IndexRequest(index)
- // 不指定ID的话,新增时ID是随机的
- // .id(items.get(i).getId().toString())
- .source(JSON.toJSONString(t), XContentType.JSON)
- );
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- logger.info(bulkResponse.buildFailureMessage());
- return !bulkResponse.hasFailures();
- }
- public <T> Boolean save(String index, List<T> sources) throws IOException {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout(TimeValue.timeValueSeconds(60));//超时时间
- for (int i = 0; i < sources.size(); i++) {
- bulkRequest.add(new IndexRequest(index)
- // 不指定ID的话,新增时ID是随机的
- // .id(items.get(i).getId().toString())
- .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
- );
- // bulkRequest.add(UpdateRequest) 批量更新
- // bulkRequest.add(DeleteRequest) 批量删除
- }
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- logger.info(bulkResponse.buildFailureMessage());
- return !bulkResponse.hasFailures();
- }
- public RestStatus save (String index, String source) throws IOException {
- IndexRequest indexRequest = new IndexRequest(index);
- indexRequest.timeout(TimeValue.timeValueSeconds(10));
- indexRequest.source(source, XContentType.JSON);
- IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- return indexResponse.status();
- }
- /**
- * 自定义ID
- * @param index
- * @param source
- * @param idFieldString
- * @return
- */
- public RestStatus saveWithCustomId (String index, String source, String idFieldString) throws IOException {
- IndexRequest indexRequest = new IndexRequest(index);
- indexRequest.timeout(TimeValue.timeValueSeconds(10));
- indexRequest.id(idFieldString).source(source, XContentType.JSON);
- IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- return indexResponse.status();
- }
- /**
- * 自定义ID
- *
- * @param index
- * @param sources
- * @param idFieldString
- * @return
- */
- public Boolean saveBulkWithCustomId (String index, List<String> sources, String idFieldString) throws IOException {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout(TimeValue.timeValueSeconds(10));
- for (int i = 0; i < sources.size(); i++) {
- JSONObject jsonObject = JSON.parseObject(sources.get(i));
- bulkRequest.add(new IndexRequest(index)
- .id(jsonObject.getString(idFieldString))
- .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
- );
- }
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- logger.info(bulkResponse.buildFailureMessage());
- return !bulkResponse.hasFailures();
- }
- /**
- * 更新
- * @param index
- * @param sources
- * @return
- */
- public <T> Boolean update (String index, List<T> sources) throws IOException {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout(TimeValue.timeValueSeconds(60));
- for (int i = 0; i < sources.size(); i++) {
- bulkRequest.add(new UpdateRequest(index,((ESIDEntity) sources.get(i)).getId())
- .doc(JSON.toJSONString(sources.get(i)), XContentType.JSON)
- );
- }
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- logger.info(bulkResponse.buildFailureMessage());
- return !bulkResponse.hasFailures();
- }
- /**
- * 更新
- * @param index
- * @param type
- * @param list
- * @return
- */
- public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout(TimeValue.timeValueSeconds(10));
- for (int i = 0; i < list.size(); i++) {
- Map map = list.get(i);
- bulkRequest.add(new UpdateRequest(index,String.valueOf(map.get("id")))
- .doc(JSON.toJSONString(map), XContentType.JSON)
- );
- }
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- logger.info(bulkResponse.buildFailureMessage());
- return !bulkResponse.hasFailures();
- }
- /**
- * 更新
- * @param index
- * @param _id
- * @param source
- * @return
- */
- public boolean update(String index, String _id, JSONObject source) throws IOException {
- UpdateRequest updateRequest = new UpdateRequest(index, _id);
- updateRequest.timeout(TimeValue.timeValueSeconds(10));
- updateRequest.doc(source.toJSONString(), XContentType.JSON);
- UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
- logger.debug("update info: " + updateResponse.status());
- return true;
- }
- /**
- * 删除
- */
- public boolean delete(String index, List<Map<String, Object>> datas) throws IOException {
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout(TimeValue.timeValueSeconds(60));
- for (Map map : datas) {
- bulkRequest.add(new DeleteRequest(index)
- .id(map.get("_id").toString()));
- }
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- logger.info(bulkResponse.buildFailureMessage());
- return !bulkResponse.hasFailures();
- }
- public <T> T searchOne(String index, SearchSourceBuilder queryStr,Class<T> beanClass) throws Exception {
- SearchRequest request = new SearchRequest().indices(index).source(queryStr);
- //带入请求执行查询
- SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- //得到查询结果
- SearchHits hits = searchResponse.getHits();
- SearchHit[] searchHits = hits.getHits();
- if(searchHits.length>0){
- Map<String,Object> datas = hits.getAt(0).getSourceAsMap();
- T t = beanClass.newInstance();
- BeanUtils.copyProperties(t,datas);
- return t;
- }
- return null;
- }
- public <T> List<T> search(String index, SearchSourceBuilder queryStr,Class<T> beanClass) throws Exception {
- SearchRequest request = new SearchRequest().indices(index).source(queryStr);
- //带入请求执行查询
- SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- //得到查询结果
- SearchHits hits = searchResponse.getHits();
- SearchHit[] searchHits = hits.getHits();
- List<T> listData = new ArrayList<>();
- //遍历查询结果
- for(SearchHit hit : searchHits){
- Map<String,Object> datas = hit.getSourceAsMap();
- T t = beanClass.newInstance();
- BeanUtils.copyProperties(t,datas);
- listData.add(t);
- logger.info(datas.toString());
- }
- return listData;
- }
- public List<String> search(String index, SearchSourceBuilder queryStr) throws Exception {
- SearchRequest request = new SearchRequest().indices(index).source(queryStr);
- //带入请求执行查询
- SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- //得到查询结果
- SearchHits hits = searchResponse.getHits();
- SearchHit[] searchHits = hits.getHits();
- List<String> listData = new ArrayList<>();
- //遍历查询结果
- for(SearchHit hit : searchHits){
- String datas = hit.getSourceAsString();
- String id = hit.getId();
- JSONObject object = JSONObject.parseObject(datas);
- object.put("id",id);
- listData.add(object.toJSONString());
- }
- return listData;
- }
- public int esCount(String index, SearchSourceBuilder queryStr) throws Exception{
- SearchRequest request = new SearchRequest().indices(index).source(queryStr);
- //带入请求执行查询
- SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- //得到查询结果
- SearchHits hits = searchResponse.getHits();
- SearchHit[] searchHits = hits.getHits();
- return searchHits.length;
- }
- public String search(String index, String id) throws IOException {
- GetRequest request = new GetRequest(index, id.toString());
- GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
- return getResponse.getSourceAsString();
- }
- public static Object mapToObject(Map<String, Object> map, Class<?> beanClass) throws Exception {
- if (map == null)
- return null;
- Object obj = beanClass.newInstance();
- Field[] fields = obj.getClass().getDeclaredFields();
- for (Field field : fields) {
- int mod = field.getModifiers();
- if(Modifier.isStatic(mod) || Modifier.isFinal(mod)){
- continue;
- }
- field.setAccessible(true);
- field.set(obj, map.get(field.getName()));
- }
- return obj;
- }
- }
|