ElasticSearch7Helper.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package com.yihu.jw.elasticsearch;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.yihu.jw.elasticsearch.model.ESIDEntity;
  5. import io.searchbox.client.JestClient;
  6. import io.searchbox.client.JestResult;
  7. import io.searchbox.core.*;
  8. import org.elasticsearch.action.bulk.BulkRequest;
  9. import org.elasticsearch.action.bulk.BulkResponse;
  10. import org.elasticsearch.action.delete.DeleteRequest;
  11. import org.elasticsearch.action.get.GetRequest;
  12. import org.elasticsearch.action.get.GetResponse;
  13. import org.elasticsearch.action.index.IndexRequest;
  14. import org.elasticsearch.action.index.IndexResponse;
  15. import org.elasticsearch.action.search.SearchRequest;
  16. import org.elasticsearch.action.search.SearchResponse;
  17. import org.elasticsearch.action.update.UpdateRequest;
  18. import org.elasticsearch.action.update.UpdateResponse;
  19. import org.elasticsearch.client.RequestOptions;
  20. import org.elasticsearch.client.RestHighLevelClient;
  21. import org.elasticsearch.common.unit.TimeValue;
  22. import org.elasticsearch.common.xcontent.XContentType;
  23. import org.elasticsearch.rest.RestStatus;
  24. import org.elasticsearch.search.SearchHit;
  25. import org.elasticsearch.search.SearchHits;
  26. import org.elasticsearch.search.builder.SearchSourceBuilder;
  27. import org.slf4j.Logger;
  28. import org.slf4j.LoggerFactory;
  29. import org.springframework.beans.BeanUtils;
  30. import org.springframework.beans.factory.annotation.Autowired;
  31. import org.springframework.stereotype.Component;
  32. import javax.annotation.Resource;
  33. import java.io.IOException;
  34. import java.lang.reflect.Field;
  35. import java.lang.reflect.Modifier;
  36. import java.util.*;
  37. /**
  38. * Utils - 主要基于(jestClient)的搜索工具
  39. * Created by chenweida on 2017/6/2.
  40. */
  41. @Component
  42. public class ElasticSearch7Helper {
  43. private Logger logger = LoggerFactory.getLogger(ElasticSearch7Helper.class);
  44. @Resource(name="esClient")
  45. private RestHighLevelClient restHighLevelClient;
  46. public <T> Boolean save(String index, List<T> sources) throws IOException {
  47. BulkRequest bulkRequest = new BulkRequest();
  48. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  49. for (int i = 0; i < sources.size(); i++) {
  50. bulkRequest.add(new IndexRequest(index)
  51. // 不指定ID的话,新增时ID是随机的
  52. // .id(items.get(i).getId().toString())
  53. .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
  54. );
  55. // bulkRequest.add(UpdateRequest) 批量更新
  56. // bulkRequest.add(DeleteRequest) 批量删除
  57. }
  58. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  59. logger.info(bulkResponse.buildFailureMessage());
  60. return !bulkResponse.hasFailures();
  61. }
  62. public RestStatus save (String index, String source) throws IOException {
  63. IndexRequest indexRequest = new IndexRequest(index);
  64. indexRequest.timeout(TimeValue.timeValueSeconds(1));
  65. indexRequest.source(source, XContentType.JSON);
  66. IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  67. return indexResponse.status();
  68. }
  69. /**
  70. * 自定义ID
  71. * @param index
  72. * @param source
  73. * @param idFieldString
  74. * @return
  75. */
  76. public RestStatus saveWithCustomId (String index, String source, String idFieldString) throws IOException {
  77. IndexRequest indexRequest = new IndexRequest(index);
  78. indexRequest.timeout(TimeValue.timeValueSeconds(1));
  79. indexRequest.id(idFieldString).source(source, XContentType.JSON);
  80. IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  81. return indexResponse.status();
  82. }
  83. /**
  84. * 自定义ID
  85. *
  86. * @param index
  87. * @param sources
  88. * @param idFieldString
  89. * @return
  90. */
  91. public Boolean saveBulkWithCustomId (String index, List<String> sources, String idFieldString) throws IOException {
  92. BulkRequest bulkRequest = new BulkRequest();
  93. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  94. for (int i = 0; i < sources.size(); i++) {
  95. JSONObject jsonObject = JSON.parseObject(sources.get(i));
  96. bulkRequest.add(new IndexRequest(index)
  97. .id(jsonObject.getString(idFieldString))
  98. .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
  99. );
  100. }
  101. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  102. logger.info(bulkResponse.buildFailureMessage());
  103. return !bulkResponse.hasFailures();
  104. }
  105. /**
  106. * 更新
  107. * @param index
  108. * @param sources
  109. * @return
  110. */
  111. public <T> Boolean update (String index, List<T> sources) throws IOException {
  112. BulkRequest bulkRequest = new BulkRequest();
  113. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  114. for (int i = 0; i < sources.size(); i++) {
  115. bulkRequest.add(new UpdateRequest(index,((ESIDEntity) sources.get(i)).getId())
  116. .doc(JSON.toJSONString(sources.get(i)), XContentType.JSON)
  117. );
  118. }
  119. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  120. logger.info(bulkResponse.buildFailureMessage());
  121. return !bulkResponse.hasFailures();
  122. }
  123. /**
  124. * 更新
  125. * @param index
  126. * @param type
  127. * @param list
  128. * @return
  129. */
  130. public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
  131. BulkRequest bulkRequest = new BulkRequest();
  132. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  133. for (int i = 0; i < list.size(); i++) {
  134. Map map = list.get(i);
  135. bulkRequest.add(new UpdateRequest(index,String.valueOf(map.get("id")))
  136. .doc(JSON.toJSONString(map), XContentType.JSON)
  137. );
  138. }
  139. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  140. logger.info(bulkResponse.buildFailureMessage());
  141. return !bulkResponse.hasFailures();
  142. }
  143. /**
  144. * 更新
  145. * @param index
  146. * @param _id
  147. * @param source
  148. * @return
  149. */
  150. public boolean update(String index, String _id, JSONObject source) throws IOException {
  151. UpdateRequest updateRequest = new UpdateRequest(index, _id);
  152. updateRequest.timeout(TimeValue.timeValueSeconds(1));
  153. updateRequest.doc(source.toJSONString(), XContentType.JSON);
  154. UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
  155. logger.debug("update info: " + updateResponse.status());
  156. return true;
  157. }
  158. /**
  159. * 删除
  160. */
  161. public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
  162. BulkRequest bulkRequest = new BulkRequest();
  163. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  164. for (Map map : datas) {
  165. bulkRequest.add(new DeleteRequest(index)
  166. .id(map.get("_id").toString()));
  167. }
  168. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  169. logger.info(bulkResponse.buildFailureMessage());
  170. return !bulkResponse.hasFailures();
  171. }
  172. public <T> List<T> search(String index, SearchSourceBuilder queryStr,Class<T> beanClass) throws Exception {
  173. SearchRequest request = new SearchRequest().indices(index).source(queryStr);
  174. //带入请求执行查询
  175. SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  176. //得到查询结果
  177. SearchHits hits = searchResponse.getHits();
  178. SearchHit[] searchHits = hits.getHits();
  179. List<T> listData = new ArrayList<>();
  180. //遍历查询结果
  181. for(SearchHit hit : searchHits){
  182. Map<String,Object> datas = hit.getSourceAsMap();
  183. T t = beanClass.newInstance();
  184. BeanUtils.copyProperties(t,datas);
  185. listData.add(t);
  186. logger.info(datas.toString());
  187. }
  188. return listData;
  189. }
  190. public List<String> search(String index, SearchSourceBuilder queryStr) throws Exception {
  191. SearchRequest request = new SearchRequest().indices(index).source(queryStr);
  192. //带入请求执行查询
  193. SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  194. //得到查询结果
  195. SearchHits hits = searchResponse.getHits();
  196. SearchHit[] searchHits = hits.getHits();
  197. List<String> listData = new ArrayList<>();
  198. //遍历查询结果
  199. for(SearchHit hit : searchHits){
  200. String datas = hit.getSourceAsString();
  201. listData.add(datas);
  202. logger.info(datas);
  203. }
  204. return listData;
  205. }
  206. public int esCount(String index, SearchSourceBuilder queryStr) throws Exception{
  207. SearchRequest request = new SearchRequest().indices(index).source(queryStr);
  208. //带入请求执行查询
  209. SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  210. //得到查询结果
  211. SearchHits hits = searchResponse.getHits();
  212. SearchHit[] searchHits = hits.getHits();
  213. return searchHits.length;
  214. }
  215. public String search(String index, String id) throws IOException {
  216. GetRequest request = new GetRequest(index, id.toString());
  217. GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
  218. return getResponse.getSourceAsString();
  219. }
  220. public static Object mapToObject(Map<String, Object> map, Class<?> beanClass) throws Exception {
  221. if (map == null)
  222. return null;
  223. Object obj = beanClass.newInstance();
  224. Field[] fields = obj.getClass().getDeclaredFields();
  225. for (Field field : fields) {
  226. int mod = field.getModifiers();
  227. if(Modifier.isStatic(mod) || Modifier.isFinal(mod)){
  228. continue;
  229. }
  230. field.setAccessible(true);
  231. field.set(obj, map.get(field.getName()));
  232. }
  233. return obj;
  234. }
  235. }