ElasticSearchHelper.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package com.yihu.elasticsearch;
  2. import com.alibaba.druid.sql.ast.SQLExpr;
  3. import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
  4. import com.alibaba.druid.sql.parser.SQLExprParser;
  5. import com.alibaba.fastjson.JSONObject;
  6. import com.fasterxml.jackson.core.JsonProcessingException;
  7. import com.fasterxml.jackson.databind.ObjectMapper;
  8. import com.yihu.elasticsearch.ElasticSearchPool;
  9. import io.searchbox.client.JestClient;
  10. import io.searchbox.client.JestResult;
  11. import io.searchbox.core.*;
  12. import org.apache.commons.collections.CollectionUtils;
  13. import org.elasticsearch.action.search.SearchResponse;
  14. import org.elasticsearch.client.Client;
  15. import org.nlpcn.es4sql.domain.Select;
  16. import org.nlpcn.es4sql.jdbc.ObjectResult;
  17. import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
  18. import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
  19. import org.nlpcn.es4sql.parse.SqlParser;
  20. import org.nlpcn.es4sql.query.AggregationQueryAction;
  21. import org.nlpcn.es4sql.query.DefaultQueryAction;
  22. import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
  23. import org.slf4j.Logger;
  24. import org.slf4j.LoggerFactory;
  25. import org.springframework.beans.factory.annotation.Autowired;
  26. import org.springframework.beans.factory.annotation.Value;
  27. import org.springframework.context.annotation.Scope;
  28. import org.springframework.stereotype.Component;
  29. import javax.annotation.PostConstruct;
  30. import java.io.BufferedReader;
  31. import java.io.IOException;
  32. import java.io.InputStreamReader;
  33. import java.text.DecimalFormat;
  34. import java.util.*;
  35. ;
  36. /**
  37. * Created by chenweida on 2017/6/2.
  38. */
  39. @Component
  40. @Scope("prototype")
  41. public class ElasticSearchHelper {
  42. private Logger logger = LoggerFactory.getLogger(ElasticSearchHelper.class);
  43. @Autowired
  44. private ElasticSearchPool elasticSearchPool;
  45. private JestClient jestClient;
  46. @Value("${es.host}")
  47. private String esHost;
  48. @Autowired
  49. private ObjectMapper objectMapper;
  50. private static String[] curlCmds = new String[6];
  51. @PostConstruct
  52. public void init() {
  53. jestClient = elasticSearchPool.getJestClient();
  54. // curl命令格式
  55. String url = esHost.split(",")[0].replace("http://", "").concat("/_sql");
  56. curlCmds[0] = "curl";
  57. curlCmds[1] = url;
  58. curlCmds[2] = "-H";
  59. curlCmds[3] = "Content-Type: application/json";
  60. curlCmds[4] = "-d";
  61. }
  62. public <T> Boolean save(String index, String type, List<T> sources) throws IOException {
  63. Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
  64. sources.forEach((item) -> {
  65. Index indexObj = (new Index.Builder(item)).build();
  66. bulk.addAction(indexObj);
  67. });
  68. BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
  69. this.logger.debug("save flag: " + br.isSucceeded());
  70. return br.isSucceeded();
  71. }
  72. public Boolean save (String index, String type, String source) throws IOException {
  73. Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
  74. Index indexObj = new Index.Builder(source).build();
  75. bulk.addAction(indexObj);
  76. BulkResult br = jestClient.execute(bulk.build());
  77. logger.debug("save flag: " + br.isSucceeded());
  78. return br.isSucceeded();
  79. }
  80. public Boolean save(String index, String type, String source,String errMsg) throws IOException {
  81. Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
  82. Index indexObj = (new Index.Builder(source)).build();
  83. bulk.addAction(indexObj);
  84. BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
  85. if(!br.isSucceeded()){
  86. this.logger.error("save flag: " + br.isSucceeded());
  87. errMsg = br.getErrorMessage();
  88. }
  89. return br.isSucceeded();
  90. }
  91. public Boolean saveWithCustomId(String index, String type, String source, String idFieldString,StringBuilder errMsg) throws IOException {
  92. Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
  93. JSONObject jsonObject = (JSONObject)(JSONObject.parse(source));
  94. Index indexObj = ((Index.Builder)(new Index.Builder(source)).id(jsonObject.getString(idFieldString))).build();
  95. bulk.addAction(indexObj);
  96. BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
  97. if(!br.isSucceeded()){
  98. errMsg.append(br.getJsonString());
  99. this.logger.error("save flag: " + br.isSucceeded() + " " + errMsg.toString());
  100. }
  101. return br.isSucceeded();
  102. }
  103. public Boolean saveBulkWithCustomId(String index, String type, List<String> sources, String idFieldString) throws IOException {
  104. Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
  105. sources.forEach((item) -> {
  106. JSONObject jsonObject = (JSONObject)((JSONObject)JSONObject.parse(item));
  107. Index indexObj = ((Index.Builder)(new Index.Builder(item)).id(jsonObject.getString(idFieldString))).build();
  108. bulk.addAction(indexObj);
  109. });
  110. BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
  111. this.logger.debug("save flag: " + br.isSucceeded());
  112. return br.isSucceeded();
  113. }
  114. public <T> Boolean update(String index, String type, List<T> sources) throws IOException {
  115. Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
  116. sources.forEach((item) -> {
  117. JSONObject jo = new JSONObject();
  118. jo.put("doc", item);
  119. // Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((Builder)(new Builder(jo.toString())).index(index)).type(type)).id((item).getId())).build();
  120. // bulk.addAction(indexObj);
  121. });
  122. BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
  123. this.logger.debug("update flag: " + br.isSucceeded());
  124. return br.isSucceeded();
  125. }
  126. public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
  127. Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
  128. Iterator var10 = list.iterator();
  129. while(var10.hasNext()) {
  130. Map map = (Map)var10.next();
  131. JSONObject jo = new JSONObject();
  132. jo.put("doc", map);
  133. Update indexObj = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(jo.toString())).index(index)).type(type)).id(String.valueOf(map.get("id")))).build();
  134. bulk.addAction(indexObj);
  135. }
  136. BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
  137. this.logger.debug("update flag: " + br.isSucceeded());
  138. return br.isSucceeded();
  139. }
  140. public boolean update(String index, String type, String _id, JSONObject source) throws IOException {
  141. JSONObject docSource = new JSONObject();
  142. docSource.put("doc", source);
  143. Update update = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(docSource)).index(index)).type(type)).id(_id)).build();
  144. JestResult jestResult = this.jestClient.execute(update);
  145. this.logger.debug("update info: " + jestResult.isSucceeded());
  146. return true;
  147. }
  148. public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
  149. Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
  150. Iterator var5 = datas.iterator();
  151. while(var5.hasNext()) {
  152. Map map = (Map)var5.next();
  153. if (map.containsKey("id") && map.containsKey("_id")) {
  154. Delete indexObj = null;
  155. if (null != map.get("_id")) {
  156. indexObj = (new Delete.Builder(map.get("_id").toString())).build();
  157. } else if (null != map.get("id")) {
  158. indexObj = (new Delete.Builder(map.get("id").toString())).build();
  159. }
  160. bulk.addAction(indexObj);
  161. }
  162. }
  163. BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
  164. this.logger.debug("delete data count: " + datas.size());
  165. this.logger.debug("delete flag: " + br.isSucceeded());
  166. return br.isSucceeded();
  167. }
  168. public SearchResult search(String index, String type, String queryStr) throws IOException {
  169. Search search = ((Search.Builder)((Search.Builder)(new Search.Builder(queryStr)).addIndex(index)).addType(type)).build();
  170. SearchResult result = (SearchResult)this.jestClient.execute(search);
  171. this.logger.info("search data count: " + result.getTotal());
  172. return result;
  173. }
  174. public List<Map<String, Object>> executeSQL(String sql) throws Exception {
  175. List<Map<String, Object>> returnModels = new ArrayList();
  176. SQLExprParser parser = new ElasticSqlExprParser(sql);
  177. SQLExpr expr = parser.expr();
  178. SQLQueryExpr queryExpr = (SQLQueryExpr)expr;
  179. // SQLBinaryExpr queryExpr = (SQLBinaryExpr)expr;
  180. Select select = (new SqlParser()).parseSelect(queryExpr);
  181. AggregationQueryAction action = null;
  182. DefaultQueryAction queryAction = null;
  183. SqlElasticSearchRequestBuilder requestBuilder = null;
  184. if (select.isAgg) {
  185. action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select);
  186. requestBuilder = action.explain();
  187. } else {
  188. Client client = this.elasticSearchPool.getClient();
  189. queryAction = new DefaultQueryAction(client, select);
  190. requestBuilder = queryAction.explain();
  191. }
  192. SearchResponse response = (SearchResponse)requestBuilder.get();
  193. Object queryResult = null;
  194. if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
  195. queryResult = response.getHits();
  196. } else {
  197. queryResult = response.getAggregations();
  198. }
  199. ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
  200. List<String> heads = temp.getHeaders();
  201. temp.getLines().stream().forEach((one) -> {
  202. Map<String, Object> oneMap = new HashMap();
  203. for(int i = 0; i < one.size(); ++i) {
  204. Object value = one.get(i);
  205. String key = (String)heads.get(i);
  206. oneMap.put(key, value);
  207. }
  208. returnModels.add(oneMap);
  209. });
  210. return returnModels;
  211. }
  212. public Integer executeCountSQL(String sql) throws Exception {
  213. SQLQueryExpr queryExpr = new SQLQueryExpr();
  214. Select select = (new SqlParser()).parseSelect(queryExpr);
  215. SqlElasticSearchRequestBuilder requestBuilder;
  216. if (select.isAgg) {
  217. AggregationQueryAction action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select);
  218. requestBuilder = action.explain();
  219. } else {
  220. Client client = this.elasticSearchPool.getClient();
  221. DefaultQueryAction queryAction = new DefaultQueryAction(client, select);
  222. requestBuilder = queryAction.explain();
  223. }
  224. SearchResponse response = (SearchResponse)requestBuilder.get();
  225. Object queryResult;
  226. if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
  227. queryResult = response.getHits();
  228. } else {
  229. queryResult = response.getAggregations();
  230. }
  231. ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
  232. for(int j = 0; j < temp.getLines().size(); ++j) {
  233. List<Object> one = (List)temp.getLines().get(j);
  234. for(int i = 0; i < one.size(); ++i) {
  235. Object value = one.get(i);
  236. if (value instanceof Double) {
  237. Double valueTemp = (Double)value;
  238. DecimalFormat df = new DecimalFormat("######0");
  239. return Integer.parseInt(df.format(valueTemp));
  240. }
  241. }
  242. }
  243. return 0;
  244. }
  245. /**
  246. * 使用curl方式执行 es-sql 查询操作
  247. * @param sql
  248. * @return
  249. */
  250. public List<JSONObject> execCurl(String sql){
  251. List<JSONObject> result = new ArrayList<>();
  252. curlCmds[5] = sql;
  253. ProcessBuilder process = new ProcessBuilder(curlCmds);
  254. Process p;
  255. StringBuilder builder = new StringBuilder();
  256. try {
  257. p = process.start();
  258. BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
  259. String line = null;
  260. while ((line = reader.readLine()) != null) {
  261. builder.append(line);
  262. builder.append(System.getProperty("line.separator"));
  263. }
  264. } catch (IOException e) {
  265. System.out.print("error");
  266. e.printStackTrace();
  267. }
  268. try {
  269. logger.info("request curlCmds:" + objectMapper.writeValueAsString(curlCmds));
  270. } catch (JsonProcessingException e) {
  271. e.printStackTrace();
  272. }
  273. String esResponse = builder.toString();
  274. logger.info("esResponse:" + esResponse);
  275. JSONObject esResObj = JSONObject.parseObject(esResponse);
  276. if(null != esResObj){
  277. if(null == (esResObj.get("error"))){
  278. if(esResObj.getJSONObject("_shards").getIntValue("total") > 0){
  279. List resList = JSONObject.parseObject(esResponse).getJSONObject("hits").getJSONArray("hits");
  280. if(!CollectionUtils.isEmpty(resList)){
  281. resList.forEach(
  282. one->{
  283. JSONObject oneHit = (JSONObject)one;
  284. result.add(oneHit.getJSONObject("_source"));
  285. }
  286. );
  287. }
  288. }
  289. }
  290. }
  291. logger.info("execCurl result:"+result.toString());
  292. return result;
  293. }
  294. }