package com.yihu.quota.util; import com.yihu.quota.model.rest.FieldInfo; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest; import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; /** * Created by janseny on 2018/9/17. */ @Configuration @Component public class ElasticSearchHandler { private static Logger logger = LoggerFactory.getLogger(ElasticSearchHandler.class); @Value("${elasticsearch.cluster-name}") private String clusterName; @Value("${elasticsearch.cluster-nodes}") private String ip; /** * 取得实例 * @return */ public synchronized TransportClient getTransportClient() { TransportClient client = null ; try { Settings settings = Settings.builder().put("cluster.name", clusterName) /* .put("client.transport.sniff", true)*/ .put("client.transport.ping_timeout", "30s").build(); String host = ip.split(";")[0]; host = host.substring(0,host.indexOf(":")); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300)); } catch (UnknownHostException e) { e.printStackTrace(); } return client; } /** * 关闭连接 * @param client es客户端 */ public void close(TransportClient client) { client.close(); } /** * 为集群添加新的节点 * @param nodeName * @param client es客户端 */ public synchronized void addNode(String nodeName,TransportClient client) { try { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(nodeName), 9300)); close(client); } catch (UnknownHostException e) { e.printStackTrace(); } } /** * 删除集群中的某个节点 * @param client es客户端 * @param name */ public synchronized void removeNode(String name,TransportClient client) { try { client.removeTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(name), 9300)); close(client); } catch (UnknownHostException e) { e.printStackTrace(); } } /** * 创建索引 * * @param index 索引名称 * @param client es客户端 */ public void createIndex(String index,TransportClient client) { CreateIndexRequest request = new CreateIndexRequest(index); client.admin().indices().create(request); close(client); } /** * 判断指定的索引名是否存在 * @param indexName 索引名 * @return 存在:true; 不存在:false; */ public boolean isExistsIndex(String indexName,TransportClient client){ IndicesExistsResponse response = client.admin().indices().exists( new IndicesExistsRequest().indices(new String[]{indexName})).actionGet(); close(client); return response.isExists(); } /** * 判断指定的索引的类型是否存在 * @param indexName 索引名 * @param indexType 索引类型 * @return 存在:true; 不存在:false; */ public boolean isExistsType(String indexName,String indexType,TransportClient client){ TypesExistsResponse response = client.admin().indices() .typesExists(new TypesExistsRequest(new String[]{indexName}, indexType)).actionGet(); boolean isEXist = response.isExists(); close(client); return isEXist; } /** * 创建mapping * @param index 索引 * @param type 类型 * @param client es客户端 * @param xMapping mapping描述 */ public void createBangMapping(String index, String type, XContentBuilder xMapping,TransportClient client) { PutMappingRequest mapping = Requests.putMappingRequest(index).type(type).source(xMapping); client.admin().indices().putMapping(mapping).actionGet(); close(client); } /** * 根据信息自动创建索引与mapping * 构建mapping描述 有问题 * @param fieldInfoList * @param client es客户端 * @return */ public void createIndexAndCreateMapping(String index, String type,List fieldInfoList,TransportClient client) { XContentBuilder mapping = null; try { CreateIndexRequestBuilder cib = client.admin() .indices().prepareCreate(index); mapping = XContentFactory.jsonBuilder() .startObject().startObject("properties"); //设置之定义字段 for(FieldInfo info : fieldInfoList){ String field = info.getField(); String dataType = info.getDataType(); if(dataType == null || "".equals(dataType.trim())){ dataType = "String"; } dataType = dataType.toLowerCase(); Integer participle = info.getParticiple(); if("string".equals(dataType)){ mapping.startObject(field).field("type", "string").field("index", "not_analyzed").endObject(); }else if("date".equals(dataType)){ mapping.startObject(field).field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject(); }else if("object".equals(dataType)){//对象属性 mapping.startObject(field).startObject("properties"); //子属性 List childFieldList = info.getFieldInfos(); for(FieldInfo child : childFieldList){ String childField = child.getField(); String childDataType = child.getDataType(); int childParticiple = child.getParticiple(); if("string".equals(childDataType)){ mapping.startObject(childField).field("type", "string").field("index", "not_analyzed").endObject(); }else if("date".equals(dataType)){ mapping.startObject(childField).field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject(); }else { mapping.startObject(childField) .field("type", childDataType).endObject(); } } mapping.endObject().endObject(); }else if("nested".equals(dataType)){//子集属性 mapping.startObject(field).field("type", "nested").startObject("properties"); //子属性 List childFieldList = info.getFieldInfos(); for(FieldInfo child : childFieldList){ String childField = child.getField(); String childDataType = child.getDataType(); int childParticiple = child.getParticiple(); if("string".equals(childDataType)){ mapping.startObject(childField).field("type", "string").field("index", "not_analyzed").endObject(); }else if("date".equals(dataType)){ mapping.startObject(childField).field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject(); }else { mapping.startObject(childField) .field("type", childDataType).endObject(); } } mapping.endObject().endObject(); }else { mapping.startObject(field) .field("type", dataType).endObject(); } } mapping.endObject() .endObject(); cib.addMapping(type, mapping); cib.execute().actionGet(); close(client); } catch (IOException e) { logger.debug("创建索引发生异常"); } } /** * 根据模板自动创建索引与mapping * @param index 索引字段 * @param type 类型 * @param client 客户端 * @throws IOException */ public void createMappingByTemplate(String index, String type,TransportClient client) throws IOException { CreateIndexRequestBuilder cib=client.admin() .indices().prepareCreate(index); XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() .startObject("properties") //设置之定义字段 .startObject("id") .field("type", "integer").field("index", "not_analyzed").endObject() .startObject("classs").field("type", "integer").field("index", "not_analyzed").endObject() .startObject("score").field("type", "integer").field("index", "not_analyzed").endObject() //子属性 .startObject("student") .startObject("properties") .startObject("name").field("type", "string").field("index", "not_analyzed").endObject() .startObject("age").field("type", "string").endObject() .endObject() .endObject() .startObject("updatetime").field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject() .endObject() .endObject(); cib.addMapping(type, mapping); cib.execute().actionGet(); close(client); } }