123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- package com.yihu.elasticsearch;
- import com.alibaba.druid.sql.ast.SQLExpr;
- import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
- import com.alibaba.druid.sql.parser.SQLExprParser;
- import com.alibaba.fastjson.JSONObject;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.elasticsearch.ElasticSearchPool;
- import io.searchbox.client.JestClient;
- import io.searchbox.client.JestResult;
- import io.searchbox.core.*;
- import org.apache.commons.collections.CollectionUtils;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.Client;
- import org.nlpcn.es4sql.domain.Select;
- import org.nlpcn.es4sql.jdbc.ObjectResult;
- import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
- import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
- import org.nlpcn.es4sql.parse.SqlParser;
- import org.nlpcn.es4sql.query.AggregationQueryAction;
- import org.nlpcn.es4sql.query.DefaultQueryAction;
- import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.text.DecimalFormat;
- import java.util.*;
- ;
- /**
- * Created by chenweida on 2017/6/2.
- */
- @Component
- @Scope("prototype")
- public class ElasticSearchHelper {
- private Logger logger = LoggerFactory.getLogger(ElasticSearchHelper.class);
- @Autowired
- private ElasticSearchPool elasticSearchPool;
- private JestClient jestClient;
- @Value("${es.host}")
- private String esHost;
- @Autowired
- private ObjectMapper objectMapper;
- private static String[] curlCmds = new String[6];
- @PostConstruct
- public void init() {
- jestClient = elasticSearchPool.getJestClient();
- // curl命令格式
- String url = esHost.split(",")[0].replace("http://", "").concat("/_sql");
- curlCmds[0] = "curl";
- curlCmds[1] = url;
- curlCmds[2] = "-H";
- curlCmds[3] = "Content-Type: application/json";
- curlCmds[4] = "-d";
- }
- public <T> Boolean save(String index, String type, List<T> sources) throws IOException {
- Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
- sources.forEach((item) -> {
- Index indexObj = (new Index.Builder(item)).build();
- bulk.addAction(indexObj);
- });
- BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
- this.logger.debug("save flag: " + br.isSucceeded());
- return br.isSucceeded();
- }
- public Boolean save (String index, String type, String source) throws IOException {
- Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
- Index indexObj = new Index.Builder(source).build();
- bulk.addAction(indexObj);
- BulkResult br = jestClient.execute(bulk.build());
- logger.debug("save flag: " + br.isSucceeded());
- return br.isSucceeded();
- }
- public Boolean save(String index, String type, String source,String errMsg) throws IOException {
- Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
- Index indexObj = (new Index.Builder(source)).build();
- bulk.addAction(indexObj);
- BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
- if(!br.isSucceeded()){
- this.logger.error("save flag: " + br.isSucceeded());
- errMsg = br.getErrorMessage();
- }
- return br.isSucceeded();
- }
- public Boolean saveWithCustomId(String index, String type, String source, String idFieldString,StringBuilder errMsg) throws IOException {
- Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
- JSONObject jsonObject = (JSONObject)(JSONObject.parse(source));
- Index indexObj = ((Index.Builder)(new Index.Builder(source)).id(jsonObject.getString(idFieldString))).build();
- bulk.addAction(indexObj);
- BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
- if(!br.isSucceeded()){
- errMsg.append(br.getJsonString());
- this.logger.error("save flag: " + br.isSucceeded() + " " + errMsg.toString());
- }
- return br.isSucceeded();
- }
- public Boolean saveBulkWithCustomId(String index, String type, List<String> sources, String idFieldString) throws IOException {
- Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
- sources.forEach((item) -> {
- JSONObject jsonObject = (JSONObject)((JSONObject)JSONObject.parse(item));
- Index indexObj = ((Index.Builder)(new Index.Builder(item)).id(jsonObject.getString(idFieldString))).build();
- bulk.addAction(indexObj);
- });
- BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
- this.logger.debug("save flag: " + br.isSucceeded());
- return br.isSucceeded();
- }
- public <T> Boolean update(String index, String type, List<T> sources) throws IOException {
- Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
- sources.forEach((item) -> {
- JSONObject jo = new JSONObject();
- jo.put("doc", item);
- // Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((Builder)(new Builder(jo.toString())).index(index)).type(type)).id((item).getId())).build();
- // bulk.addAction(indexObj);
- });
- BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
- this.logger.debug("update flag: " + br.isSucceeded());
- return br.isSucceeded();
- }
- public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
- Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
- Iterator var10 = list.iterator();
- while(var10.hasNext()) {
- Map map = (Map)var10.next();
- JSONObject jo = new JSONObject();
- jo.put("doc", map);
- Update indexObj = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(jo.toString())).index(index)).type(type)).id(String.valueOf(map.get("id")))).build();
- bulk.addAction(indexObj);
- }
- BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
- this.logger.debug("update flag: " + br.isSucceeded());
- return br.isSucceeded();
- }
- public boolean update(String index, String type, String _id, JSONObject source) throws IOException {
- JSONObject docSource = new JSONObject();
- docSource.put("doc", source);
- Update update = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(docSource)).index(index)).type(type)).id(_id)).build();
- JestResult jestResult = this.jestClient.execute(update);
- this.logger.debug("update info: " + jestResult.isSucceeded());
- return true;
- }
- public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
- Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
- Iterator var5 = datas.iterator();
- while(var5.hasNext()) {
- Map map = (Map)var5.next();
- if (map.containsKey("id") && map.containsKey("_id")) {
- Delete indexObj = null;
- if (null != map.get("_id")) {
- indexObj = (new Delete.Builder(map.get("_id").toString())).build();
- } else if (null != map.get("id")) {
- indexObj = (new Delete.Builder(map.get("id").toString())).build();
- }
- bulk.addAction(indexObj);
- }
- }
- BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
- this.logger.debug("delete data count: " + datas.size());
- this.logger.debug("delete flag: " + br.isSucceeded());
- return br.isSucceeded();
- }
- public SearchResult search(String index, String type, String queryStr) throws IOException {
- Search search = ((Search.Builder)((Search.Builder)(new Search.Builder(queryStr)).addIndex(index)).addType(type)).build();
- SearchResult result = (SearchResult)this.jestClient.execute(search);
- this.logger.info("search data count: " + result.getTotal());
- return result;
- }
- public List<Map<String, Object>> executeSQL(String sql) throws Exception {
- List<Map<String, Object>> returnModels = new ArrayList();
- SQLExprParser parser = new ElasticSqlExprParser(sql);
- SQLExpr expr = parser.expr();
- SQLQueryExpr queryExpr = (SQLQueryExpr)expr;
- // SQLBinaryExpr queryExpr = (SQLBinaryExpr)expr;
- Select select = (new SqlParser()).parseSelect(queryExpr);
- AggregationQueryAction action = null;
- DefaultQueryAction queryAction = null;
- SqlElasticSearchRequestBuilder requestBuilder = null;
- if (select.isAgg) {
- action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select);
- requestBuilder = action.explain();
- } else {
- Client client = this.elasticSearchPool.getClient();
- queryAction = new DefaultQueryAction(client, select);
- requestBuilder = queryAction.explain();
- }
- SearchResponse response = (SearchResponse)requestBuilder.get();
- Object queryResult = null;
- if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
- queryResult = response.getHits();
- } else {
- queryResult = response.getAggregations();
- }
- ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
- List<String> heads = temp.getHeaders();
- temp.getLines().stream().forEach((one) -> {
- Map<String, Object> oneMap = new HashMap();
- for(int i = 0; i < one.size(); ++i) {
- Object value = one.get(i);
- String key = (String)heads.get(i);
- oneMap.put(key, value);
- }
- returnModels.add(oneMap);
- });
- return returnModels;
- }
- public Integer executeCountSQL(String sql) throws Exception {
- SQLQueryExpr queryExpr = new SQLQueryExpr();
- Select select = (new SqlParser()).parseSelect(queryExpr);
- SqlElasticSearchRequestBuilder requestBuilder;
- if (select.isAgg) {
- AggregationQueryAction action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select);
- requestBuilder = action.explain();
- } else {
- Client client = this.elasticSearchPool.getClient();
- DefaultQueryAction queryAction = new DefaultQueryAction(client, select);
- requestBuilder = queryAction.explain();
- }
- SearchResponse response = (SearchResponse)requestBuilder.get();
- Object queryResult;
- if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
- queryResult = response.getHits();
- } else {
- queryResult = response.getAggregations();
- }
- ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
- for(int j = 0; j < temp.getLines().size(); ++j) {
- List<Object> one = (List)temp.getLines().get(j);
- for(int i = 0; i < one.size(); ++i) {
- Object value = one.get(i);
- if (value instanceof Double) {
- Double valueTemp = (Double)value;
- DecimalFormat df = new DecimalFormat("######0");
- return Integer.parseInt(df.format(valueTemp));
- }
- }
- }
- return 0;
- }
- /**
- * 使用curl方式执行 es-sql 查询操作
- * @param sql
- * @return
- */
- public List<JSONObject> execCurl(String sql){
- List<JSONObject> result = new ArrayList<>();
- curlCmds[5] = sql;
- ProcessBuilder process = new ProcessBuilder(curlCmds);
- Process p;
- StringBuilder builder = new StringBuilder();
- try {
- p = process.start();
- BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
- String line = null;
- while ((line = reader.readLine()) != null) {
- builder.append(line);
- builder.append(System.getProperty("line.separator"));
- }
- } catch (IOException e) {
- System.out.print("error");
- e.printStackTrace();
- }
- try {
- logger.info("request curlCmds:" + objectMapper.writeValueAsString(curlCmds));
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- String esResponse = builder.toString();
- logger.info("esResponse:" + esResponse);
- JSONObject esResObj = JSONObject.parseObject(esResponse);
- if(null != esResObj){
- if(null == (esResObj.get("error"))){
- if(esResObj.getJSONObject("_shards").getIntValue("total") > 0){
- List resList = JSONObject.parseObject(esResponse).getJSONObject("hits").getJSONArray("hits");
- if(!CollectionUtils.isEmpty(resList)){
- resList.forEach(
- one->{
- JSONObject oneHit = (JSONObject)one;
- result.add(oneHit.getJSONObject("_source"));
- }
- );
- }
- }
- }
- }
- logger.info("execCurl result:"+result.toString());
- return result;
- }
- }
|