|
@ -0,0 +1,333 @@
|
|
|
package com.yihu.elasticsearch;
|
|
|
|
|
|
import com.alibaba.druid.sql.ast.SQLExpr;
|
|
|
import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
|
|
|
import com.alibaba.druid.sql.parser.SQLExprParser;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.yihu.elasticsearch.ElasticSearchPool;
|
|
|
import io.searchbox.client.JestClient;
|
|
|
import io.searchbox.client.JestResult;
|
|
|
import io.searchbox.core.*;
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.nlpcn.es4sql.domain.Select;
|
|
|
import org.nlpcn.es4sql.jdbc.ObjectResult;
|
|
|
import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
|
|
|
import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
|
|
|
import org.nlpcn.es4sql.parse.SqlParser;
|
|
|
import org.nlpcn.es4sql.query.AggregationQueryAction;
|
|
|
import org.nlpcn.es4sql.query.DefaultQueryAction;
|
|
|
import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.annotation.Scope;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.text.DecimalFormat;
|
|
|
import java.util.*;
|
|
|
|
|
|
;
|
|
|
|
|
|
/**
|
|
|
* Created by chenweida on 2017/6/2.
|
|
|
*/
|
|
|
@Component
|
|
|
@Scope("prototype")
|
|
|
public class ElasticSearchHelper {
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(ElasticSearchHelper.class);
|
|
|
@Autowired
|
|
|
private ElasticSearchPool elasticSearchPool;
|
|
|
|
|
|
private JestClient jestClient;
|
|
|
|
|
|
@Value("${es.host}")
|
|
|
private String esHost;
|
|
|
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
|
|
|
private static String[] curlCmds = new String[6];
|
|
|
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
jestClient = elasticSearchPool.getJestClient();
|
|
|
// curl命令格式
|
|
|
String url = esHost.split(",")[0].replace("http://", "").concat("/_sql");
|
|
|
curlCmds[0] = "curl";
|
|
|
curlCmds[1] = url;
|
|
|
curlCmds[2] = "-H";
|
|
|
curlCmds[3] = "Content-Type: application/json";
|
|
|
curlCmds[4] = "-d";
|
|
|
}
|
|
|
|
|
|
public <T> Boolean save(String index, String type, List<T> sources) throws IOException {
|
|
|
Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
|
|
|
sources.forEach((item) -> {
|
|
|
Index indexObj = (new Index.Builder(item)).build();
|
|
|
bulk.addAction(indexObj);
|
|
|
});
|
|
|
BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
|
|
|
this.logger.debug("save flag: " + br.isSucceeded());
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public Boolean save (String index, String type, String source) throws IOException {
|
|
|
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(index).defaultType(type);
|
|
|
Index indexObj = new Index.Builder(source).build();
|
|
|
bulk.addAction(indexObj);
|
|
|
BulkResult br = jestClient.execute(bulk.build());
|
|
|
logger.debug("save flag: " + br.isSucceeded());
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public Boolean save(String index, String type, String source,String errMsg) throws IOException {
|
|
|
Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
|
|
|
Index indexObj = (new Index.Builder(source)).build();
|
|
|
bulk.addAction(indexObj);
|
|
|
BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
|
|
|
if(!br.isSucceeded()){
|
|
|
this.logger.error("save flag: " + br.isSucceeded());
|
|
|
errMsg = br.getErrorMessage();
|
|
|
}
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public Boolean saveWithCustomId(String index, String type, String source, String idFieldString,StringBuilder errMsg) throws IOException {
|
|
|
Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
|
|
|
JSONObject jsonObject = (JSONObject)(JSONObject.parse(source));
|
|
|
Index indexObj = ((Index.Builder)(new Index.Builder(source)).id(jsonObject.getString(idFieldString))).build();
|
|
|
bulk.addAction(indexObj);
|
|
|
BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
|
|
|
if(!br.isSucceeded()){
|
|
|
errMsg.append(br.getJsonString());
|
|
|
this.logger.error("save flag: " + br.isSucceeded() + " " + errMsg.toString());
|
|
|
}
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public Boolean saveBulkWithCustomId(String index, String type, List<String> sources, String idFieldString) throws IOException {
|
|
|
Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
|
|
|
sources.forEach((item) -> {
|
|
|
JSONObject jsonObject = (JSONObject)((JSONObject)JSONObject.parse(item));
|
|
|
Index indexObj = ((Index.Builder)(new Index.Builder(item)).id(jsonObject.getString(idFieldString))).build();
|
|
|
bulk.addAction(indexObj);
|
|
|
});
|
|
|
BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
|
|
|
this.logger.debug("save flag: " + br.isSucceeded());
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public <T> Boolean update(String index, String type, List<T> sources) throws IOException {
|
|
|
Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
|
|
|
sources.forEach((item) -> {
|
|
|
JSONObject jo = new JSONObject();
|
|
|
jo.put("doc", item);
|
|
|
// Update indexObj = ((io.searchbox.core.Update.Builder)((io.searchbox.core.Update.Builder)((Builder)(new Builder(jo.toString())).index(index)).type(type)).id((item).getId())).build();
|
|
|
// bulk.addAction(indexObj);
|
|
|
});
|
|
|
BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
|
|
|
this.logger.debug("update flag: " + br.isSucceeded());
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public Boolean updateByMap(String index, String type, List<Map<String, Object>> list) throws IOException {
|
|
|
Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
|
|
|
Iterator var10 = list.iterator();
|
|
|
|
|
|
while(var10.hasNext()) {
|
|
|
Map map = (Map)var10.next();
|
|
|
JSONObject jo = new JSONObject();
|
|
|
jo.put("doc", map);
|
|
|
Update indexObj = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(jo.toString())).index(index)).type(type)).id(String.valueOf(map.get("id")))).build();
|
|
|
bulk.addAction(indexObj);
|
|
|
}
|
|
|
|
|
|
BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
|
|
|
this.logger.debug("update flag: " + br.isSucceeded());
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public boolean update(String index, String type, String _id, JSONObject source) throws IOException {
|
|
|
JSONObject docSource = new JSONObject();
|
|
|
docSource.put("doc", source);
|
|
|
Update update = ((Update.Builder)((Update.Builder)((Update.Builder)(new Update.Builder(docSource)).index(index)).type(type)).id(_id)).build();
|
|
|
JestResult jestResult = this.jestClient.execute(update);
|
|
|
this.logger.debug("update info: " + jestResult.isSucceeded());
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
public boolean delete(String index, String type, List<Map<String, Object>> datas) throws IOException {
|
|
|
Bulk.Builder bulk = (new Bulk.Builder()).defaultIndex(index).defaultType(type);
|
|
|
Iterator var5 = datas.iterator();
|
|
|
|
|
|
while(var5.hasNext()) {
|
|
|
Map map = (Map)var5.next();
|
|
|
if (map.containsKey("id") && map.containsKey("_id")) {
|
|
|
Delete indexObj = null;
|
|
|
if (null != map.get("_id")) {
|
|
|
indexObj = (new Delete.Builder(map.get("_id").toString())).build();
|
|
|
} else if (null != map.get("id")) {
|
|
|
indexObj = (new Delete.Builder(map.get("id").toString())).build();
|
|
|
}
|
|
|
|
|
|
bulk.addAction(indexObj);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
BulkResult br = (BulkResult)this.jestClient.execute(bulk.build());
|
|
|
this.logger.debug("delete data count: " + datas.size());
|
|
|
this.logger.debug("delete flag: " + br.isSucceeded());
|
|
|
return br.isSucceeded();
|
|
|
}
|
|
|
|
|
|
public SearchResult search(String index, String type, String queryStr) throws IOException {
|
|
|
Search search = ((Search.Builder)((Search.Builder)(new Search.Builder(queryStr)).addIndex(index)).addType(type)).build();
|
|
|
SearchResult result = (SearchResult)this.jestClient.execute(search);
|
|
|
this.logger.info("search data count: " + result.getTotal());
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
public List<Map<String, Object>> executeSQL(String sql) throws Exception {
|
|
|
List<Map<String, Object>> returnModels = new ArrayList();
|
|
|
SQLExprParser parser = new ElasticSqlExprParser(sql);
|
|
|
SQLExpr expr = parser.expr();
|
|
|
SQLQueryExpr queryExpr = (SQLQueryExpr)expr;
|
|
|
// SQLBinaryExpr queryExpr = (SQLBinaryExpr)expr;
|
|
|
Select select = (new SqlParser()).parseSelect(queryExpr);
|
|
|
AggregationQueryAction action = null;
|
|
|
DefaultQueryAction queryAction = null;
|
|
|
SqlElasticSearchRequestBuilder requestBuilder = null;
|
|
|
if (select.isAgg) {
|
|
|
action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select);
|
|
|
requestBuilder = action.explain();
|
|
|
} else {
|
|
|
Client client = this.elasticSearchPool.getClient();
|
|
|
queryAction = new DefaultQueryAction(client, select);
|
|
|
requestBuilder = queryAction.explain();
|
|
|
}
|
|
|
|
|
|
SearchResponse response = (SearchResponse)requestBuilder.get();
|
|
|
Object queryResult = null;
|
|
|
if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
|
|
|
queryResult = response.getHits();
|
|
|
} else {
|
|
|
queryResult = response.getAggregations();
|
|
|
}
|
|
|
|
|
|
ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
|
|
|
List<String> heads = temp.getHeaders();
|
|
|
temp.getLines().stream().forEach((one) -> {
|
|
|
Map<String, Object> oneMap = new HashMap();
|
|
|
|
|
|
for(int i = 0; i < one.size(); ++i) {
|
|
|
Object value = one.get(i);
|
|
|
String key = (String)heads.get(i);
|
|
|
oneMap.put(key, value);
|
|
|
}
|
|
|
|
|
|
returnModels.add(oneMap);
|
|
|
});
|
|
|
return returnModels;
|
|
|
}
|
|
|
|
|
|
public Integer executeCountSQL(String sql) throws Exception {
|
|
|
SQLQueryExpr queryExpr = new SQLQueryExpr();
|
|
|
Select select = (new SqlParser()).parseSelect(queryExpr);
|
|
|
SqlElasticSearchRequestBuilder requestBuilder;
|
|
|
if (select.isAgg) {
|
|
|
AggregationQueryAction action = new AggregationQueryAction(this.elasticSearchPool.getClient(), select);
|
|
|
requestBuilder = action.explain();
|
|
|
} else {
|
|
|
Client client = this.elasticSearchPool.getClient();
|
|
|
DefaultQueryAction queryAction = new DefaultQueryAction(client, select);
|
|
|
requestBuilder = queryAction.explain();
|
|
|
}
|
|
|
|
|
|
SearchResponse response = (SearchResponse)requestBuilder.get();
|
|
|
Object queryResult;
|
|
|
if (sql.toUpperCase().indexOf("GROUP") == -1 && sql.toUpperCase().indexOf("SUM") == -1 && sql.toUpperCase().indexOf("COUNT(") == -1) {
|
|
|
queryResult = response.getHits();
|
|
|
} else {
|
|
|
queryResult = response.getAggregations();
|
|
|
}
|
|
|
|
|
|
ObjectResult temp = (new ObjectResultsExtractor(true, true, true)).extractResults(queryResult, true);
|
|
|
|
|
|
for(int j = 0; j < temp.getLines().size(); ++j) {
|
|
|
List<Object> one = (List)temp.getLines().get(j);
|
|
|
|
|
|
for(int i = 0; i < one.size(); ++i) {
|
|
|
Object value = one.get(i);
|
|
|
if (value instanceof Double) {
|
|
|
Double valueTemp = (Double)value;
|
|
|
DecimalFormat df = new DecimalFormat("######0");
|
|
|
return Integer.parseInt(df.format(valueTemp));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 使用curl方式执行 es-sql 查询操作
|
|
|
* @param sql
|
|
|
* @return
|
|
|
*/
|
|
|
public List<JSONObject> execCurl(String sql){
|
|
|
List<JSONObject> result = new ArrayList<>();
|
|
|
curlCmds[5] = sql;
|
|
|
ProcessBuilder process = new ProcessBuilder(curlCmds);
|
|
|
Process p;
|
|
|
StringBuilder builder = new StringBuilder();
|
|
|
try {
|
|
|
p = process.start();
|
|
|
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
|
|
|
String line = null;
|
|
|
while ((line = reader.readLine()) != null) {
|
|
|
builder.append(line);
|
|
|
builder.append(System.getProperty("line.separator"));
|
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
System.out.print("error");
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
try {
|
|
|
logger.info("request curlCmds:" + objectMapper.writeValueAsString(curlCmds));
|
|
|
} catch (JsonProcessingException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
String esResponse = builder.toString();
|
|
|
logger.info("esResponse:" + esResponse);
|
|
|
JSONObject esResObj = JSONObject.parseObject(esResponse);
|
|
|
if(null != esResObj){
|
|
|
if(null == (esResObj.get("error"))){
|
|
|
if(esResObj.getJSONObject("_shards").getIntValue("total") > 0){
|
|
|
List resList = JSONObject.parseObject(esResponse).getJSONObject("hits").getJSONArray("hits");
|
|
|
if(!CollectionUtils.isEmpty(resList)){
|
|
|
resList.forEach(
|
|
|
one->{
|
|
|
JSONObject oneHit = (JSONObject)one;
|
|
|
result.add(oneHit.getJSONObject("_source"));
|
|
|
}
|
|
|
);
|
|
|
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
logger.info("execCurl result:"+result.toString());
|
|
|
return result;
|
|
|
}
|
|
|
}
|