ElasticSearch7Helper.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. package com.yihu.jw.elasticsearch;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.yihu.jw.elasticsearch.model.ESIDEntity;
  5. import org.elasticsearch.action.bulk.BulkRequest;
  6. import org.elasticsearch.action.bulk.BulkResponse;
  7. import org.elasticsearch.action.delete.DeleteRequest;
  8. import org.elasticsearch.action.get.GetRequest;
  9. import org.elasticsearch.action.get.GetResponse;
  10. import org.elasticsearch.action.index.IndexRequest;
  11. import org.elasticsearch.action.index.IndexResponse;
  12. import org.elasticsearch.action.search.SearchRequest;
  13. import org.elasticsearch.action.search.SearchResponse;
  14. import org.elasticsearch.action.update.UpdateRequest;
  15. import org.elasticsearch.action.update.UpdateResponse;
  16. import org.elasticsearch.client.RequestOptions;
  17. import org.elasticsearch.client.RestHighLevelClient;
  18. import org.elasticsearch.common.unit.TimeValue;
  19. import org.elasticsearch.common.xcontent.XContentType;
  20. import org.elasticsearch.rest.RestStatus;
  21. import org.elasticsearch.search.SearchHit;
  22. import org.elasticsearch.search.SearchHits;
  23. import org.elasticsearch.search.builder.SearchSourceBuilder;
  24. import org.slf4j.Logger;
  25. import org.slf4j.LoggerFactory;
  26. import org.springframework.beans.BeanUtils;
  27. import org.springframework.stereotype.Component;
  28. import javax.annotation.Resource;
  29. import java.io.IOException;
  30. import java.lang.reflect.Field;
  31. import java.lang.reflect.Modifier;
  32. import java.util.ArrayList;
  33. import java.util.List;
  34. import java.util.Map;
  35. /**
  36. * Utils - 主要基于(jestClient)的搜索工具
  37. * Created by chenweida on 2017/6/2.
  38. */
  39. @Component
  40. public class ElasticSearch7Helper {
  41. private Logger logger = LoggerFactory.getLogger(ElasticSearch7Helper.class);
  42. @Resource(name="restHighLevelClient")
  43. private RestHighLevelClient restHighLevelClient;
  44. public <T> Boolean saveOne(String index, T t) throws IOException {
  45. BulkRequest bulkRequest = new BulkRequest();
  46. bulkRequest.timeout(TimeValue.timeValueSeconds(60));
  47. bulkRequest.add(new IndexRequest(index)
  48. // 不指定ID的话,新增时ID是随机的
  49. // .id(items.get(i).getId().toString())
  50. .source(JSON.toJSONString(t), XContentType.JSON)
  51. );
  52. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  53. logger.info(bulkResponse.buildFailureMessage());
  54. return !bulkResponse.hasFailures();
  55. }
  56. public <T> Boolean save(String index, List<T> sources) throws IOException {
  57. BulkRequest bulkRequest = new BulkRequest();
  58. bulkRequest.timeout(TimeValue.timeValueSeconds(60));//超时时间
  59. for (int i = 0; i < sources.size(); i++) {
  60. bulkRequest.add(new IndexRequest(index)
  61. // 不指定ID的话,新增时ID是随机的
  62. // .id(items.get(i).getId().toString())
  63. .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
  64. );
  65. // bulkRequest.add(UpdateRequest) 批量更新
  66. // bulkRequest.add(DeleteRequest) 批量删除
  67. }
  68. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  69. logger.info(bulkResponse.buildFailureMessage());
  70. return !bulkResponse.hasFailures();
  71. }
  72. public RestStatus save (String index, String source) throws IOException {
  73. IndexRequest indexRequest = new IndexRequest(index);
  74. indexRequest.timeout(TimeValue.timeValueSeconds(10));
  75. indexRequest.source(source, XContentType.JSON);
  76. IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  77. return indexResponse.status();
  78. }
  79. /**
  80. * 自定义ID
  81. * @param index
  82. * @param source
  83. * @param idFieldString
  84. * @return
  85. */
  86. public RestStatus saveWithCustomId (String index, String source, String idFieldString) throws IOException {
  87. IndexRequest indexRequest = new IndexRequest(index);
  88. indexRequest.timeout(TimeValue.timeValueSeconds(10));
  89. indexRequest.id(idFieldString).source(source, XContentType.JSON);
  90. IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  91. return indexResponse.status();
  92. }
  93. /**
  94. * 自定义ID
  95. *
  96. * @param index
  97. * @param sources
  98. * @param idFieldString
  99. * @return
  100. */
  101. public Boolean saveBulkWithCustomId (String index, List<String> sources, String idFieldString) throws IOException {
  102. BulkRequest bulkRequest = new BulkRequest();
  103. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  104. for (int i = 0; i < sources.size(); i++) {
  105. JSONObject jsonObject = JSON.parseObject(sources.get(i));
  106. bulkRequest.add(new IndexRequest(index)
  107. .id(jsonObject.getString(idFieldString))
  108. .source(JSON.toJSONString(sources.get(i)), XContentType.JSON)
  109. );
  110. }
  111. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  112. logger.info(bulkResponse.buildFailureMessage());
  113. return !bulkResponse.hasFailures();
  114. }
  115. /**
  116. * 更新
  117. * @param index
  118. * @param sources
  119. * @return
  120. */
  121. public <T> Boolean update (String index, List<T> sources) throws IOException {
  122. BulkRequest bulkRequest = new BulkRequest();
  123. bulkRequest.timeout(TimeValue.timeValueSeconds(60));
  124. for (int i = 0; i < sources.size(); i++) {
  125. bulkRequest.add(new UpdateRequest(index,((ESIDEntity) sources.get(i)).getId())
  126. .doc(JSON.toJSONString(sources.get(i)), XContentType.JSON)
  127. );
  128. }
  129. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  130. logger.info(bulkResponse.buildFailureMessage());
  131. return !bulkResponse.hasFailures();
  132. }
  133. /**
  134. * 更新
  135. * @param index
  136. * @param type
  137. * @param list
  138. * @return
  139. */
  140. public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
  141. BulkRequest bulkRequest = new BulkRequest();
  142. bulkRequest.timeout(TimeValue.timeValueSeconds(10));
  143. for (int i = 0; i < list.size(); i++) {
  144. Map map = list.get(i);
  145. bulkRequest.add(new UpdateRequest(index,String.valueOf(map.get("id")))
  146. .doc(JSON.toJSONString(map), XContentType.JSON)
  147. );
  148. }
  149. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  150. logger.info(bulkResponse.buildFailureMessage());
  151. return !bulkResponse.hasFailures();
  152. }
  153. /**
  154. * 更新
  155. * @param index
  156. * @param _id
  157. * @param source
  158. * @return
  159. */
  160. public boolean update(String index, String _id, JSONObject source) throws IOException {
  161. UpdateRequest updateRequest = new UpdateRequest(index, _id);
  162. updateRequest.timeout(TimeValue.timeValueSeconds(10));
  163. updateRequest.doc(source.toJSONString(), XContentType.JSON);
  164. UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
  165. logger.debug("update info: " + updateResponse.status());
  166. return true;
  167. }
  168. /**
  169. * 删除
  170. */
  171. public boolean delete(String index, List<Map<String, Object>> datas) throws IOException {
  172. BulkRequest bulkRequest = new BulkRequest();
  173. bulkRequest.timeout(TimeValue.timeValueSeconds(60));
  174. for (Map map : datas) {
  175. bulkRequest.add(new DeleteRequest(index)
  176. .id(map.get("_id").toString()));
  177. }
  178. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  179. logger.info(bulkResponse.buildFailureMessage());
  180. return !bulkResponse.hasFailures();
  181. }
  182. public <T> T searchOne(String index, SearchSourceBuilder queryStr,Class<T> beanClass) throws Exception {
  183. SearchRequest request = new SearchRequest().indices(index).source(queryStr);
  184. //带入请求执行查询
  185. SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  186. //得到查询结果
  187. SearchHits hits = searchResponse.getHits();
  188. SearchHit[] searchHits = hits.getHits();
  189. if(searchHits.length>0){
  190. Map<String,Object> datas = hits.getAt(0).getSourceAsMap();
  191. T t = beanClass.newInstance();
  192. BeanUtils.copyProperties(t,datas);
  193. return t;
  194. }
  195. return null;
  196. }
  197. public <T> List<T> search(String index, SearchSourceBuilder queryStr,Class<T> beanClass) throws Exception {
  198. SearchRequest request = new SearchRequest().indices(index).source(queryStr);
  199. //带入请求执行查询
  200. SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  201. //得到查询结果
  202. SearchHits hits = searchResponse.getHits();
  203. SearchHit[] searchHits = hits.getHits();
  204. List<T> listData = new ArrayList<>();
  205. //遍历查询结果
  206. for(SearchHit hit : searchHits){
  207. Map<String,Object> datas = hit.getSourceAsMap();
  208. T t = beanClass.newInstance();
  209. BeanUtils.copyProperties(t,datas);
  210. listData.add(t);
  211. logger.info(datas.toString());
  212. }
  213. return listData;
  214. }
  215. public List<String> search(String index, SearchSourceBuilder queryStr) throws Exception {
  216. SearchRequest request = new SearchRequest().indices(index).source(queryStr);
  217. //带入请求执行查询
  218. SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  219. //得到查询结果
  220. SearchHits hits = searchResponse.getHits();
  221. SearchHit[] searchHits = hits.getHits();
  222. List<String> listData = new ArrayList<>();
  223. //遍历查询结果
  224. for(SearchHit hit : searchHits){
  225. String datas = hit.getSourceAsString();
  226. String id = hit.getId();
  227. JSONObject object = JSONObject.parseObject(datas);
  228. object.put("id",id);
  229. listData.add(object.toJSONString());
  230. }
  231. return listData;
  232. }
  233. public int esCount(String index, SearchSourceBuilder queryStr) throws Exception{
  234. SearchRequest request = new SearchRequest().indices(index).source(queryStr);
  235. //带入请求执行查询
  236. SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  237. //得到查询结果
  238. SearchHits hits = searchResponse.getHits();
  239. SearchHit[] searchHits = hits.getHits();
  240. return searchHits.length;
  241. }
  242. public String search(String index, String id) throws IOException {
  243. GetRequest request = new GetRequest(index, id.toString());
  244. GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
  245. return getResponse.getSourceAsString();
  246. }
  247. public static Object mapToObject(Map<String, Object> map, Class<?> beanClass) throws Exception {
  248. if (map == null)
  249. return null;
  250. Object obj = beanClass.newInstance();
  251. Field[] fields = obj.getClass().getDeclaredFields();
  252. for (Field field : fields) {
  253. int mod = field.getModifiers();
  254. if(Modifier.isStatic(mod) || Modifier.isFinal(mod)){
  255. continue;
  256. }
  257. field.setAccessible(true);
  258. field.set(obj, map.get(field.getName()));
  259. }
  260. return obj;
  261. }
  262. }