package com.yihu.elasticsearch; import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.expr.SQLQueryExpr; import com.alibaba.druid.sql.parser.SQLExprParser; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.yihu.elasticsearch.ElasticSearchPool; import io.searchbox.client.JestClient; import io.searchbox.client.JestResult; import io.searchbox.core.*; import org.apache.commons.collections.CollectionUtils; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.nlpcn.es4sql.domain.Select; import org.nlpcn.es4sql.jdbc.ObjectResult; import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor; import org.nlpcn.es4sql.parse.ElasticSqlExprParser; import org.nlpcn.es4sql.parse.SqlParser; import org.nlpcn.es4sql.query.AggregationQueryAction; import org.nlpcn.es4sql.query.DefaultQueryAction; import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.text.DecimalFormat; import java.util.*; ; /** * Created by chenweida on 2017/6/2. */ @Component @Scope("prototype") public class ElasticSearchHelper { private Logger logger = LoggerFactory.getLogger(ElasticSearchHelper.class); @Autowired private ElasticSearchPool elasticSearchPool; private JestClient jestClient; @Value("${es.host}") private String esHost; @Autowired private ObjectMapper objectMapper; private static String[] curlCmds = new String[6]; @PostConstruct public void init() { jestClient = elasticSearchPool.getJestClient(); // curl命令格式 String url = esHost.split(",")[0].replace("http://", "").concat("/_sql"); curlCmds[0] = "curl"; curlCmds[1] = url; curlCmds[2] = "-H"; curlCmds[3] = "Content-Type: application/json"; curlCmds[4] = "-d"; } public Boolean save(String index, String type, List sources) throws IOException { Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type); sources.forEach((item) -> { Index indexObj = (new Index.Builder(item)).build(); bulk.addAction(indexObj); }); BulkResult br = (BulkResult)this.jestClient.execute(bulk.build()); this.logger.debug("save flag: " + br.isSucceeded()); return br.isSucceeded(); } public Boolean save (String index, String type, String source) throws IOException { Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type); Index indexObj = new Index.Builder(source).build(); bulk.addAction(indexObj); BulkResult br = jestClient.execute(bulk.build()); logger.debug("save flag: " + br.isSucceeded()); return br.isSucceeded(); } public Boolean save(String index, String type, String source,String errMsg) throws IOException { Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type); Index indexObj = (new Index.Builder(source)).build(); bulk.addAction(indexObj); BulkResult br = (BulkResult)this.jestClient.execute(bulk.build()); if(!br.isSucceeded()){ this.logger.error("save flag: " + br.isSucceeded()); errMsg = br.getErrorMessage(); } return br.isSucceeded(); } public Boolean saveWithCustomId(String index, String type, String source, String idFieldString,StringBuilder errMsg) throws IOException { Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type); JSONObject jsonObject = (JSONObject)(JSONObject.parse(source)); Index indexObj = ((Index.Builder)(new Index.Builder(source)).id(jsonObject.getString(idFieldString))).build(); bulk.addAction(indexObj); BulkResult br = (BulkResult)this.jestClient.execute(bulk.build()); if(!br.isSucceeded()){ errMsg.append(br.getJsonString()); this.logger.error("save flag: " + br.isSucceeded() + " " + errMsg.toString()); } return br.isSucceeded(); } public Boolean saveBulkWithCustomId(String index, String type, List sources, String idFieldString) throws IOException { Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type); sources.forEach((item) -> { JSONObject jsonObject = (JSONObject)((JSONObject)JSONObject.parse(item)); Index indexObj = ((Index.Builder)(new Index.Builder(item)).id(jsonObject.getString(idFieldString))).build(); bulk.addAction(indexObj); }); BulkResult br = (BulkResult)this.jestClient.execute(bulk.build()); this.logger.debug("save flag: " + br.isSucceeded()); return br.isSucceeded(); } public Boolean update(String index, String type, List sources) throws IOException { Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type); sources.forEach((item) -> { JSONObject jo = new JSONObject(); jo.put("doc", item); // Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((Builder)(new Builder(jo.toString())).index(index)).type(type)).id((item).getId())).build(); // bulk.addAction(indexObj); }); BulkResult br = (BulkResult)this.jestClient.execute(bulk.build()); this.logger.debug("update flag: " + br.isSucceeded()); return br.isSucceeded(); } public Boolean updateByMap(String index, String type, List> list) throws IOException { Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type); Iterator var10 = list.iterator(); while(var10.hasNext()) { Map map = (Map)var10.next(); JSONObject jo = new JSONObject(); jo.put("doc", map); Update indexObj = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(jo.toString())).index(index)).type(type)).id(String.valueOf(map.get("id")))).build(); bulk.addAction(indexObj); } BulkResult br = (BulkResult)this.jestClient.execute(bulk.build()); this.logger.debug("update flag: " + br.isSucceeded()); return br.isSucceeded(); } public boolean update(String index, String type, String _id, JSONObject source) throws IOException { JSONObject docSource = new JSONObject(); docSource.put("doc", source); Update update = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(docSource)).index(index)).type(type)).id(_id)).build(); JestResult jestResult = this.jestClient.execute(update); this.logger.debug("update info: " + jestResult.isSucceeded()); return true; } public boolean delete(String index, String type, List> datas) throws IOException { Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type); Iterator var5 = datas.iterator(); while(var5.hasNext()) { Map map = (Map)var5.next(); if (map.containsKey("id") && map.containsKey("_id")) { Delete indexObj = null; if (null != map.get("_id")) { indexObj = (new Delete.Builder(map.get("_id").toString())).build(); } else if (null != map.get("id")) { indexObj = (new Delete.Builder(map.get("id").toString())).build(); } bulk.addAction(indexObj); } } BulkResult br = (BulkResult)this.jestClient.execute(bulk.build()); this.logger.debug("delete data count: " + datas.size()); this.logger.debug("delete flag: " + br.isSucceeded()); return br.isSucceeded(); } public SearchResult search(String index, String type, String queryStr) throws IOException { Search search = ((Search.Builder)((Search.Builder)(new Search.Builder(queryStr)).addIndex(index)).addType(type)).build(); SearchResult result = (SearchResult)this.jestClient.execute(search); this.logger.info("search data count: " + result.getTotal()); return result; } public List> executeSQL(String sql) throws Exception { List> returnModels = new ArrayList(); SQLExprParser parser = new ElasticSqlExprParser(sql); SQLExpr expr = parser.expr(); SQLQueryExpr queryExpr = (SQLQueryExpr)expr; // SQLBinaryExpr queryExpr = (SQLBinaryExpr)expr; Select select = (new SqlParser()).parseSelect(queryExpr); AggregationQueryAction action = null; DefaultQueryAction queryAction = null; SqlElasticSearchRequestBuilder requestBuilder = null; if (select.isAgg) { action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select); requestBuilder = action.explain(); } else { Client client = this.elasticSearchPool.getClient(); queryAction = new DefaultQueryAction(client, select); requestBuilder = queryAction.explain(); } SearchResponse response = (SearchResponse)requestBuilder.get(); Object queryResult = null; if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) { queryResult = response.getHits(); } else { queryResult = response.getAggregations(); } ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true); List heads = temp.getHeaders(); temp.getLines().stream().forEach((one) -> { Map oneMap = new HashMap(); for(int i = 0; i < one.size(); ++i) { Object value = one.get(i); String key = (String)heads.get(i); oneMap.put(key, value); } returnModels.add(oneMap); }); return returnModels; } public Integer executeCountSQL(String sql) throws Exception { SQLQueryExpr queryExpr = new SQLQueryExpr(); Select select = (new SqlParser()).parseSelect(queryExpr); SqlElasticSearchRequestBuilder requestBuilder; if (select.isAgg) { AggregationQueryAction action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select); requestBuilder = action.explain(); } else { Client client = this.elasticSearchPool.getClient(); DefaultQueryAction queryAction = new DefaultQueryAction(client, select); requestBuilder = queryAction.explain(); } SearchResponse response = (SearchResponse)requestBuilder.get(); Object queryResult; if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) { queryResult = response.getHits(); } else { queryResult = response.getAggregations(); } ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true); for(int j = 0; j < temp.getLines().size(); ++j) { List one = (List)temp.getLines().get(j); for(int i = 0; i < one.size(); ++i) { Object value = one.get(i); if (value instanceof Double) { Double valueTemp = (Double)value; DecimalFormat df = new DecimalFormat("######0"); return Integer.parseInt(df.format(valueTemp)); } } } return 0; } /** * 使用curl方式执行 es-sql 查询操作 * @param sql * @return */ public List execCurl(String sql){ List result = new ArrayList<>(); curlCmds[5] = sql; ProcessBuilder process = new ProcessBuilder(curlCmds); Process p; StringBuilder builder = new StringBuilder(); try { p = process.start(); BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream())); String line = null; while ((line = reader.readLine()) != null) { builder.append(line); builder.append(System.getProperty("line.separator")); } } catch (IOException e) { System.out.print("error"); e.printStackTrace(); } try { logger.info("request curlCmds:" + objectMapper.writeValueAsString(curlCmds)); } catch (JsonProcessingException e) { e.printStackTrace(); } String esResponse = builder.toString(); logger.info("esResponse:" + esResponse); JSONObject esResObj = JSONObject.parseObject(esResponse); if(null != esResObj){ if(null == (esResObj.get("error"))){ if(esResObj.getJSONObject("_shards").getIntValue("total") > 0){ List resList = JSONObject.parseObject(esResponse).getJSONObject("hits").getJSONArray("hits"); if(!CollectionUtils.isEmpty(resList)){ resList.forEach( one->{ JSONObject oneHit = (JSONObject)one; result.add(oneHit.getJSONObject("_source")); } ); } } } } logger.info("execCurl result:"+result.toString()); return result; } }