Progr1mmer vor 6 Jahren
Ursprung
Commit
43931c289e

+ 41 - 0
elasticsearch-transport-starter/pom.xml

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>jkzl-starter</artifactId>
        <groupId>com.yihu</groupId>
        <version>2.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>elasticsearch-transport-starter</artifactId>
    <packaging>jar</packaging>
    <dependencies>
        <!-- true -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
        <!-- ElasticSearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.nlpcn</groupId>
            <artifactId>elasticsearch-sql</artifactId>
        </dependency>
        <!-- ElasticSearch -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>
    </dependencies>
</project>

+ 81 - 0
elasticsearch-transport-starter/src/main/java/com/yihu/elasticsearch/transport/ElasticSearchPool.java

@ -0,0 +1,81 @@
package com.yihu.elasticsearch.transport;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory;
import com.yihu.elasticsearch.transport.config.ElasticSearchConfig;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.net.InetSocketAddress;
import java.util.Properties;
/**
 * Created by progr1mmer on 2018/1/4.
 */
@Component
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class ElasticSearchPool {
    private static volatile TransportClient transportClient;
    @Autowired
    private ElasticSearchConfig elasticSearchConfig;
    private TransportClient getTransportClient() {
        Settings settings = Settings.builder()
                .put("cluster.name", elasticSearchConfig.getClusterName())
                .put("client.transport.sniff", false)
                .build();
        String[] nodeArr = elasticSearchConfig.getClusterNodes().split(",");
        InetSocketTransportAddress[] socketArr = new InetSocketTransportAddress[nodeArr.length];
        for (int i = 0; i < socketArr.length; i++) {
            if (!StringUtils.isEmpty(nodeArr[i])) {
                String[] nodeInfo = nodeArr[i].split(":");
                socketArr[i] = new InetSocketTransportAddress(new InetSocketAddress(nodeInfo[0], new Integer(nodeInfo[1])));
            }
        }
        return TransportClient.builder().settings(settings).build().addTransportAddresses(socketArr);
    }
    /**
     * 1.TransportClient本身支持多线程的数据请求
     * 2.移除多个TransportClient的线程池支持,减少Socket链接
     * 3.基于多重检查的单例模式,兼顾安全和效率
     * 4.为提高效率,使用完毕后请勿进行 transportClient.close() 的关闭操作
     * @return
     */
    public TransportClient getClient() {
        if (transportClient != null) {
            if (transportClient.connectedNodes().isEmpty()) {
                synchronized (TransportClient.class) {
                    if (transportClient.connectedNodes().isEmpty()) {
                        transportClient = getTransportClient();
                    }
                }
            }
            return transportClient;
        }
        synchronized (TransportClient.class) {
            if (null == transportClient) {
                transportClient = getTransportClient();
            }
        }
        return transportClient;
    }
    public DruidDataSource getDruidDataSource() throws Exception {
        Properties properties = new Properties();
        properties.put("url", "jdbc:elasticsearch://" + elasticSearchConfig.getClusterNodes() + "/");
        DruidDataSource druidDataSource = (DruidDataSource) ElasticSearchDruidDataSourceFactory
                .createDataSource(properties);
        druidDataSource.setInitialSize(1);
        return druidDataSource;
    }
}

+ 746 - 0
elasticsearch-transport-starter/src/main/java/com/yihu/elasticsearch/transport/ElasticSearchUtil.java

@ -0,0 +1,746 @@
package com.yihu.elasticsearch.transport;
import com.alibaba.druid.pool.DruidDataSource;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.ParseException;
import java.util.*;
/**
 * Util - Es搜索服务
 * Created by progr1mmer on 2017/12/2.
 */
@Service
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class ElasticSearchUtil {
    @Autowired
    private ElasticSearchPool elasticSearchPool;
    /**
     * 创建映射
     *  注意:保存数据之前如果没有创建相应的字
     *  段映射会导致搜索结果不准确
     * @param index
     * @param type
     * @param source
     * @param setting - 该设置根据需要进行配置
     * @throws IOException
     */
    public void mapping (String index, String type, Map<String, Map<String, String>> source, Map<String, Object> setting) throws IOException{
        TransportClient transportClient = elasticSearchPool.getClient();
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("properties");
        for (String field : source.keySet()) {
            xContentBuilder.startObject(field);
            Map<String, String> propsMap = source.get(field);
            for (String prop : propsMap.keySet()) {
                xContentBuilder.field(prop, propsMap.get(prop));
            }
            xContentBuilder.endObject();
        }
        xContentBuilder.endObject().endObject();
        CreateIndexRequestBuilder createIndexRequestBuilder = transportClient.admin().indices().prepareCreate(index);
        createIndexRequestBuilder.addMapping(type, xContentBuilder);
        /*Map<String, Object> settingSource = new HashMap<>();
        settingSource.put("index.translog.flush_threshold_size", "1g"); //log文件大小
        settingSource.put("index.translog.flush_threshold_ops", "100000"); //flush触发次数
        settingSource.put("index.translog.durability", "async"); //异步更新
        settingSource.put("index.refresh_interval", "30s"); //刷新间隔
        settingSource.put("index.number_of_replicas", 1); //副本数
        settingSource.put("index.number_of_shards", 3); //分片数
        createIndexRequestBuilder.setSettings(settingSource);*/
        if (setting != null && !setting.isEmpty()) {
            createIndexRequestBuilder.setSettings(setting);
        }
        createIndexRequestBuilder.get();
    }
    /**
     * 移除索引 - 整个移除
     * @param index
     */
    public void remove (String index){
        TransportClient transportClient = elasticSearchPool.getClient();
        DeleteIndexRequestBuilder deleteIndexRequestBuilder = transportClient.admin().indices().prepareDelete(index);
        deleteIndexRequestBuilder.get();
    }
    /**
     * 添加数据
     * @param index
     * @param type
     * @param source
     * @return
     * @throws ParseException
     */
    public Map<String, Object> index (String index, String type, Map<String, Object> source) throws ParseException{
        TransportClient transportClient = elasticSearchPool.getClient();
        String _id = (String) source.remove("_id");
        if (StringUtils.isEmpty(_id)) {
            IndexResponse response = transportClient.prepareIndex(index, type).setSource(source).get();
            source.put("_id", response.getId());
        } else {
            IndexResponse response = transportClient.prepareIndex(index, type, _id).setSource(source).get();
            source.put("_id", response.getId());
        }
        return source;
    }
    /**
     * 批量添加数据 - 效率高
     * @param index
     * @param type
     * @param source
     * @throws ParseException
     */
    public void bulkIndex (String index, String type, List<Map<String, Object>> source) throws ParseException{
        if (source.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            source.forEach(item -> {
                String _id = (String) item.remove("_id");
                if (StringUtils.isEmpty(_id)) {
                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type).setSource(item));
                } else {
                    bulkRequestBuilder.add(transportClient.prepareIndex(index, type, _id).setSource(item));
                }
            });
            bulkRequestBuilder.get();
        }
    }
    /**
     * 删除数据
     * @param index
     * @param type
     * @param id
     */
    public void delete (String index, String type, String id) {
        TransportClient transportClient = elasticSearchPool.getClient();
        transportClient.prepareDelete(index, type, id).get();
    }
    /**
     * 批量删除数据
     * @param index
     * @param type
     * @param idArr
     */
    public void bulkDelete (String index, String type, String [] idArr) {
        if (idArr.length > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            for (String id : idArr) {
                bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
            }
            bulkRequestBuilder.get();
        }
    }
    /**
     * 根据字段批量删除数据
     * @param index
     * @param type
     * @param field
     * @param value
     */
    public void deleteByField(String index, String type, String field, Object value) {
        deleteByFilter(index, type, field + "=" + value);
    }
    /**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param filters
     */
    public void deleteByFilter(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        deleteByFilter(index, type, queryBuilder);
    }
    /**
     * 根据条件批量删除数据
     * @param index
     * @param type
     * @param queryBuilder
     */
    public void deleteByFilter(String index, String type, QueryBuilder queryBuilder) {
        List<String> idList = getIds(index, type, queryBuilder);
        if (idList.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            String [] idArr = new String[idList.size()];
            idArr = idList.toArray(idArr);
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            for (String id : idArr) {
                bulkRequestBuilder.add(transportClient.prepareDelete(index, type, id));
            }
            bulkRequestBuilder.get();
        }
    }
    /**
     * 更新数据 - 返回最新文档
     * @param index
     * @param type
     * @param id
     * @param source
     * @return
     * @throws DocumentMissingException
     */
    public Map<String, Object> update(String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
        TransportClient transportClient = elasticSearchPool.getClient();
        source.remove("_id");
        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
        return findById(index, type, id);
    }
    /**
     * 更新数据 - 不返回文档
     * @param index
     * @param type
     * @param id
     * @param source
     * @throws DocumentMissingException
     */
    public void voidUpdate (String index, String type, String id, Map<String, Object> source) throws DocumentMissingException {
        TransportClient transportClient = elasticSearchPool.getClient();
        source.remove("_id");
        transportClient.prepareUpdate(index, type, id).setDoc(source).setRetryOnConflict(5).get();
    }
    /**
     * 批量更新数据
     * @param index
     * @param type
     * @param source
     * @throws DocumentMissingException
     */
    public void bulkUpdate(String index, String type, List<Map<String, Object>> source) throws DocumentMissingException {
        if (source.size() > 0) {
            TransportClient transportClient = elasticSearchPool.getClient();
            BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
            source.forEach(item -> {
                String _id = (String)item.remove("_id");
                if (!StringUtils.isEmpty(_id)) {
                    bulkRequestBuilder.add(transportClient.prepareUpdate(index, type, _id).setDoc(item).setRetryOnConflict(5));
                }
            });
            bulkRequestBuilder.get();
        }
    }
    /**
     * 根据ID查找数据
     * @param index
     * @param type
     * @param id
     * @return
     */
    public Map<String, Object> findById(String index, String type, String id) {
        TransportClient transportClient = elasticSearchPool.getClient();
        GetRequest getRequest = new GetRequest(index, type, id);
        GetResponse response = transportClient.get(getRequest).actionGet();
        Map<String, Object> source = response.getSource();
        if (source != null) {
            source.put("_id", response.getId());
        }
        return source;
    }
    /**
     * 根据字段查找数据
     * @param index
     * @param type
     * @param field
     * @param value
     * @return
     */
    public List<Map<String, Object>> findByField(String index, String type, String field, Object value) {
        return list(index, type, field + "=" + value);
    }
    /**
     * 获取文档列表
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public List<Map<String, Object>> list(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return list(index, type, queryBuilder);
    }
    /**
     * 获取文档列表
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public List<Map<String, Object>> list(String index, String type, QueryBuilder queryBuilder) {
        int size = (int)count(index, type, queryBuilder);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSource();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        return resultList;
    }
    /**
     * 获取文档分页
     * @param index
     * @param type
     * @param filters
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, int page, int size) {
        return page(index, type, filters, null, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param filters
     * @param sorts
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, String filters, String sorts, int page, int size) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        List<SortBuilder> sortBuilders = getSortBuilder(sorts);
        return page(index, type, queryBuilder, sortBuilders, page, size);
    }
    /**
     * 获取分档分页 - 带分页功能
     * @param index
     * @param type
     * @param queryBuilder
     * @param sortBuilders
     * @param page
     * @param size
     * @return
     */
    public Page<Map<String, Object>> page(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, int page, int size) {
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, sortBuilders, (page - 1) * size, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<Map<String, Object>> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            Map<String, Object> source = hit.getSource();
            source.put("_id", hit.getId());
            resultList.add(source);
        }
        return new PageImpl<>(resultList, new PageRequest(page - 1, size), hits.totalHits());
    }
    /**
     * 获取ID列表
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public List<String> getIds (String index, String type, String filters){
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return getIds(index, type, queryBuilder);
    }
    /**
     * 获取ID列表
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public List<String> getIds (String index, String type, QueryBuilder queryBuilder) {
        int size = (int)count(index, type, queryBuilder);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, size);
        SearchResponse response = builder.get();
        SearchHits hits = response.getHits();
        List<String> resultList = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            resultList.add(hit.getId());
        }
        return resultList;
    }
    /**
     * 获取文档数
     * @param index
     * @param type
     * @param filters
     * @return
     */
    public long count(String index, String type, String filters) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        return count(index, type, queryBuilder);
    }
    /**
     * 获取文档数
     * @param index
     * @param type
     * @param queryBuilder
     * @return
     */
    public long count(String index, String type, QueryBuilder queryBuilder) {
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        return builder.get().getHits().totalHits();
    }
    /**
     * 根据SQL查找数据
     * @param field
     * @param sql
     * @return
     * @throws Exception
     */
    public List<Map<String, Object>> findBySql(List<String> field, String sql) throws Exception {
        List<Map<String, Object>> list = new ArrayList<>();
        DruidDataSource druidDataSource = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            druidDataSource = elasticSearchPool.getDruidDataSource();
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                Map<String, Object> rowData = new HashMap<>();
                for (String _field : field) {
                    rowData.put(_field, resultSet.getObject(_field));
                }
                list.add(rowData);
            }
            return list;
        } catch (Exception e) {
            if (!"Error".equals(e.getMessage())){
                e.printStackTrace();
            }
            return new ArrayList<>();
        } finally {
            if (resultSet != null) {
                resultSet.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
            if (druidDataSource != null) {
                druidDataSource.close();
            }
        }
    }
    /**
     * 根据SQL查找数据
     * @param sql
     * @return
     * @throws Exception
     */
    public ResultSet findBySql(String sql) throws Exception {
        DruidDataSource druidDataSource = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;
        try {
            druidDataSource = elasticSearchPool.getDruidDataSource();
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            resultSet = preparedStatement.executeQuery();
            return resultSet;
        } finally {
            if (resultSet != null) {
                resultSet.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
            if (druidDataSource != null) {
                druidDataSource.close();
            }
        }
    }
    /**
     * 根据日期分组
     * @param index
     * @param type
     * @param filters
     * @param start
     * @param end
     * @param field
     * @param interval
     * @param format
     * @return
     */
    public Map<String, Long> dateHistogram(String index, String type, String filters, Date start, Date end, String field, DateHistogramInterval interval, String format) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
        DateHistogramBuilder dateHistogramBuilder = new DateHistogramBuilder(index + "-" + field);
        dateHistogramBuilder.field(field);
        dateHistogramBuilder.interval(interval);
        if (!StringUtils.isEmpty(format)) {
            dateHistogramBuilder.format(format);
        }
        dateHistogramBuilder.minDocCount(0);
        dateHistogramBuilder.extendedBounds(start.getTime(), end.getTime());
        builder.addAggregation(dateHistogramBuilder);
        SearchResponse response = builder.get();
        Histogram histogram = response.getAggregations().get(index + "-" + field);
        Map<String, Long> temp = new HashMap<>();
        histogram.getBuckets().forEach(item -> temp.put(item.getKeyAsString(), item.getDocCount()));
        return temp;
    }
    /**
     * 查询去重数量
     * @param index
     * @param type
     * @param filters
     * @param filed
     * @return
     */
    public int cardinality(String index, String type, String filters, String filed){
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, 0, 0);
        CardinalityBuilder cardinality = AggregationBuilders.cardinality("cardinality").field(filed);
        builder.addAggregation(cardinality);
        SearchResponse response = builder.get();
        InternalCardinality internalCard = response.getAggregations().get("cardinality");
        return new Double(internalCard.getProperty("value").toString()).intValue();
    }
    /**
     * 分组统计
     * @param index
     * @param type
     * @param filters
     * @param groupField
     * @return
     */
    public Map<String, Long> countByGroup(String index, String type, String filters, String groupField) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        AbstractAggregationBuilder aggregation = AggregationBuilders.terms("count").field(groupField);
        builder.addAggregation(aggregation);
        SearchResponse response = builder.get();
        Terms terms = response.getAggregations().get("count");
        List<Terms.Bucket> buckets = terms.getBuckets();
        Map<String, Long> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets) {
            //System.out.println(bucket.getKey()+"----"+bucket.getDocCount());
            groupMap.put(bucket.getKey().toString(), bucket.getDocCount());
        }
        return groupMap;
    }
    /**
     * 分组求和
     * @param index
     * @param type
     * @param filters
     * @param sumField
     * @param groupField
     * @return
     */
    public Map<String, Double> sumByGroup(String index, String type, String filters, String sumField, String groupField) {
        QueryBuilder queryBuilder = getQueryBuilder(filters);
        SearchRequestBuilder builder = searchRequestBuilder(index, type, queryBuilder, null, null, null);
        TermsBuilder aggregation = AggregationBuilders.terms("sum_query").field(groupField);
        SumBuilder sumBuilder= AggregationBuilders.sum("sum_row").field(sumField);
        aggregation.subAggregation(sumBuilder);
        builder.addAggregation(aggregation);
        SearchResponse response = builder.get();
        Terms terms = response.getAggregations().get("sum_query");
        List<Terms.Bucket> buckets = terms.getBuckets();
        Map<String, Double> groupMap = new HashMap<>();
        for (Terms.Bucket bucket : buckets){
            Sum sum2 = bucket.getAggregations().get("sum_row");
            groupMap.put(bucket.getKey().toString(), sum2.getValue());
        }
        return groupMap;
    }
    /**
     * 获取基础请求生成器
     * @param index
     * @param type
     * @param queryBuilder
     * @param sortBuilders
     * @return
     */
    public SearchRequestBuilder searchRequestBuilder(String index, String type, QueryBuilder queryBuilder, List<SortBuilder> sortBuilders, Integer from, Integer size) {
        TransportClient transportClient = elasticSearchPool.getClient();
        SearchRequestBuilder builder = transportClient.prepareSearch(index);
        builder.setTypes(type);
        builder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
        builder.setQuery(queryBuilder);
        builder.setExplain(true);
        if (sortBuilders != null) {
            sortBuilders.forEach(item -> builder.addSort(item));
        }
        if (from != null) {
            builder.setFrom(from);
        }
        if (size != null) {
            builder.setSize(size);
        }
        return builder;
    }
    /**
     * 排序语句转换
     * @param sorts
     * @return
     */
    public List<SortBuilder> getSortBuilder(String sorts) {
        List<SortBuilder> sortBuilderList = new ArrayList<>();
        if (StringUtils.isEmpty(sorts)) {
            return sortBuilderList;
        }
        String [] sortArr = sorts.split(";");
        for (String sort : sortArr) {
            String operator = sort.substring(0, 1);
            SortBuilder sortBuilder = new FieldSortBuilder(sort.substring(1));
            if ("-".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.DESC);
            } else if ("+".equalsIgnoreCase(operator.trim())) {
                sortBuilder.order(SortOrder.ASC);
            } else {
                sortBuilder.order(SortOrder.DESC);
            }
            sortBuilderList.add(sortBuilder);
        }
        return sortBuilderList;
    }
    /**
     * 查询语句转换
     * @param filters
     * @return
     */
    public QueryBuilder getQueryBuilder(String filters) {
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (StringUtils.isEmpty(filters)) {
            return boolQueryBuilder;
        }
        String [] filterArr = filters.split(";");
        for (String filter : filterArr) {
            if (filter.contains("||")){
                String [] fields = filter.split("\\|\\|");
                BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
                for (String filed : fields) {
                    String [] condition = filed.split("=");
                    queryBuilder.should(QueryBuilders.termQuery(condition[0], condition[1]));
                }
                boolQueryBuilder.must(queryBuilder);
            } else if (filter.contains("?")) {
                String [] condition = filter.split("\\?");
                MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchPhraseQuery(condition[0], condition[1]);
                boolQueryBuilder.must(matchQueryBuilder);
            } else if (filter.contains("<>")) {
                String [] condition = filter.split("<>");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.mustNot(termsQueryBuilder);
                } else {
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.mustNot(termQueryBuilder);
                }
            } else if (filter.contains(">=")) {
                String [] condition = filter.split(">=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains(">")) {
                String [] condition = filter.split(">");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.gt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<=")) {
                String [] condition = filter.split("<=");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lte(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("<")) {
                String [] condition = filter.split("<");
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(condition[0]);
                rangeQueryBuilder.lt(condition[1]);
                boolQueryBuilder.must(rangeQueryBuilder);
            } else if (filter.contains("=")) {
                String [] condition = filter.split("=");
                if (condition[1].contains(",")) {
                    String [] inCondition = condition[1].split(",");
                    TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(condition[0], inCondition);
                    boolQueryBuilder.must(termsQueryBuilder);
                } else {
                    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(condition[0], condition[1]);
                    boolQueryBuilder.must(termQueryBuilder);
                }
            }
        }
        return boolQueryBuilder;
    }
}

+ 45 - 0
elasticsearch-transport-starter/src/main/java/com/yihu/elasticsearch/transport/config/ElasticSearchConfig.java

@ -0,0 +1,45 @@
package com.yihu.elasticsearch.transport.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
 * Created by progr1mmer on 2017/12/1.
 */
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
public class ElasticSearchConfig {
    // 集群名称
    private String clusterName;
    // 节点
    private String clusterNodes;
    public String getClusterName() {
        return clusterName;
    }
    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }
    public String getClusterNodes() {
        return clusterNodes;
    }
    public void setClusterNodes(String clusterNodes) {
        this.clusterNodes = clusterNodes;
    }
    @PostConstruct
    private void configInfo() {
        StringBuilder info = new StringBuilder("{");
        info.append("\n  elasticsearch.cluster-name = " + clusterName);
        info.append("\n  elasticsearch.cluster-nodes = " + clusterNodes);
        info.append("\n}");
        System.out.println("Elasticsearch.configInfo : " + info.toString());
    }
}

+ 50 - 0
hbase-starter/pom.xml

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>jkzl-starter</artifactId>
        <groupId>com.yihu</groupId>
        <version>2.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>hbase-starter</artifactId>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-protocol</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-hbase</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>
</project>

+ 32 - 0
hbase-starter/src/main/java/com/yihu/hbase/AbstractHBaseClient.java

@ -0,0 +1,32 @@
package com.yihu.hbase;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
/**
 * AbstractHBaseClient - 基类
 * @author hzp
 * @created 2017.05.03
 */
public abstract class AbstractHBaseClient {
    @Autowired
    protected HbaseTemplate hbaseTemplate;
    /**
     * 创建连接
     */
    protected Connection getConnection() throws Exception {
        return getConnection(hbaseTemplate);
    }
    /**
     * 创建连接
     */
    protected Connection getConnection(HbaseTemplate hbaseTemplate) throws Exception {
        Connection connection = ConnectionFactory.createConnection(hbaseTemplate.getConfiguration());
        return connection;
    }
}

+ 213 - 0
hbase-starter/src/main/java/com/yihu/hbase/HBaseAdmin.java

@ -0,0 +1,213 @@
package com.yihu.hbase;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
 * Service - HBase DDL
 * @author hzp && Progr1mmer
 * @created 2017.05.03
 * @modifey 2017/11/23
 */
@Service
public class HBaseAdmin extends AbstractHBaseClient {
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 判断表是否存在
     * @param tableName
     * @return
     * @throws Exception
     */
    public boolean isTableExists(String tableName) throws Exception {
        Connection connection = null;
        Admin admin = null;
        try {
            connection = getConnection();
            admin = connection.getAdmin();
            return admin.tableExists(TableName.valueOf(tableName));
        }finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }
    }
    /**
     * 新建表
     * @param tableName
     * @param columnFamilies
     * @throws Exception
     */
    public void createTable(String tableName, String... columnFamilies) throws Exception {
        Connection connection = null;
        Admin admin = null;
        try {
            connection = getConnection();
            admin = connection.getAdmin();
            if (!admin.tableExists(TableName.valueOf(tableName))) {
                HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
                for (String fc : columnFamilies) {
                    tableDescriptor.addFamily(new HColumnDescriptor(fc));
                }
                admin.createTable(tableDescriptor);
            }
        }finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }
    }
    /**
     * 模糊匹配表名
     * @param regex 表达式
     * @param includeSysTables 是否包含系统表
     * @return
     * @throws Exception
     */
    public List<String> getTableList(String regex, boolean includeSysTables) throws Exception {
        Connection connection = null;
        Admin admin = null;
        List<String> tables = new ArrayList<>();
        try {
            connection = getConnection();
            admin = connection.getAdmin();
            TableName[] tableNames;
            if (regex == null || regex.length() == 0) {
                tableNames = admin.listTableNames();
            } else {
                tableNames = admin.listTableNames(regex, includeSysTables);
            }
            for (TableName tableName : tableNames) {
                tables.add(tableName.getNameAsString());
            }
            return tables;
        }finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }
    }
    /**
     * 批量清空表数据 (直接删除相关表,再新建)
     * @param tables
     * @throws Exception
     */
    public void cleanTable(List<String> tables) throws Exception {
        Connection connection = null;
        Admin admin = null;
        try {
            connection = getConnection();
            admin = connection.getAdmin();
            for (String tableName : tables) {
                TableName tn = TableName.valueOf(tableName);
                if (admin.tableExists(TableName.valueOf(tableName))) {
                    HTableDescriptor descriptor = admin.getTableDescriptor(tn);
                    admin.disableTable(tn);
                    admin.deleteTable(tn);
                    admin.createTable(descriptor);
                }
                else{
                    System.out.print("not exit table "+tableName+".\r\n");
                }
                /*else{
                    HTableDescriptor descriptor = new HTableDescriptor(tableName);
                    descriptor.addFamily(new HColumnDescriptor("basic"));
                    descriptor.addFamily(new HColumnDescriptor("d"));
                    admin.createTable(descriptor);
                }*/
            }
        } finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }
    }
    /**
     * 删除表
     * @param tableName
     * @throws Exception
     */
    public void dropTable(String tableName) throws Exception {
        Connection connection = null;
        Admin admin = null;
        try {
            connection = getConnection();
            admin = connection.getAdmin();
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
        } finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }
    }
    /**
     * 获取表结构
     * @param tableName
     * @return
     * @throws Exception
     */
    public ObjectNode getTableMetaData(String tableName) throws Exception{
        Connection connection = null;
        Admin admin = null;
        try {
            connection = getConnection();
            admin = connection.getAdmin();
            TableName tn = TableName.valueOf(tableName);
            if (admin.tableExists(tn)) {
                ObjectNode objectNode = objectMapper.createObjectNode();
                HTableDescriptor tableDescriptor = admin.getTableDescriptor(tn);
                HColumnDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies();
                for (int i = 0; i < columnDescriptors.length; ++i) {
                    HColumnDescriptor columnDescriptor = columnDescriptors[i];
                    objectNode.put(Integer.toString(i), Bytes.toString(columnDescriptor.getName()));
                }
                return objectNode;
            }
            return null;
        }finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }
    }
}

+ 392 - 0
hbase-starter/src/main/java/com/yihu/hbase/HBaseDao.java

@ -0,0 +1,392 @@
package com.yihu.hbase;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.*;
/**
 * HBase - DML
 * @Author Progr1mmer
 */
@Service
public class HBaseDao extends AbstractHBaseClient {
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 新增数据 - 多列族
     * @param tableName
     * @param rowKey
     * @param family
     * @throws Exception
     */
    public void add(String tableName, String rowKey, Map<String, Map<String, String>> family) {
        hbaseTemplate.execute(tableName, (table) -> {
            Put p = new Put(rowKey.getBytes());
            for (String familyName : family.keySet()) {
                Map<String, String> map = family.get(familyName);
                for (String qualifier : map.keySet()) {
                    String value = map.get(qualifier);
                    p.addColumn(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
                }
            }
            table.put(p);
            return null;
        });
    }
    /**
     * 新增数据 - 单列族
     * @param tableName
     * @param rowKey
     * @param family
     * @param columns
     * @param values
     */
    public void add(String tableName, String rowKey, String family, Object[] columns, Object[] values) {
        hbaseTemplate.execute(tableName, new TableCallback<Void>() {
            @Override
            public Void doInTable(HTableInterface table) throws Throwable {
                Put put = new Put(Bytes.toBytes(rowKey));
                for (int j = 0; j < columns.length; j++) {
                    //为空字段不保存
                    if (values[j] != null) {
                        String column = String.valueOf(columns[j]);
                        String value = String.valueOf(values[j]);
                        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
                    }
                }
                table.put(put);
                return null;
            }
        });
    }
    /**
     * 删除记录
     * @param tableName
     * @param rowKey
     */
    public void delete(String tableName, String rowKey)  {
        hbaseTemplate.execute(tableName, new TableCallback<Void>() {
            @Override
            public Void doInTable(HTableInterface table) throws Throwable {
                Delete d = new Delete(rowKey.getBytes());
                table.delete(d);
                return null;
            }
        });
    }
    /**
     * 批量删除数据
     * @param tableName
     * @param rowKeys
     * @return
     * @throws Exception
     */
    public Object[] deleteBatch(String tableName, String[] rowKeys) {
        return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
            @Override
            public Object[] doInTable(HTableInterface table) throws Throwable {
                List<Delete> deletes = new ArrayList<>(rowKeys.length);
                for (String rowKey : rowKeys) {
                    Delete delete = new Delete(Bytes.toBytes(rowKey));
                    deletes.add(delete);
                }
                Object[] results = new Object[deletes.size()];
                table.batch(deletes, results);
                return results;
            }
        });
    }
    /**
     * 删除列族
     * @param tableName
     * @param rowKey
     * @param familyName
     * @throws Exception
     */
    public void deleteFamily(String tableName, String rowKey, String familyName) throws Exception {
        hbaseTemplate.delete(tableName, rowKey, familyName);
    }
    /**
     * 删除某列
     * @param tableName
     * @param rowKey
     * @param familyName
     * @param columnName
     * @throws Exception
     */
    public void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws Exception {
        hbaseTemplate.delete(tableName, rowKey, familyName, columnName);
    }
    /**
     * 修改某行某列值
     */
    public void put(String tableName, String rowKey, String familyName, String qualifier, String value) throws Exception {
        hbaseTemplate.put(tableName, rowKey, familyName, qualifier, value.getBytes());
    }
    /**
     * 模糊匹配rowKey
     * @param tableName 表名
     * @param rowKeyRegEx 表达式
     * @return
     * @throws Exception
     */
    public String[] findRowKeys(String tableName, String startRow, String stopRow, String rowKeyRegEx) throws Exception {
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("basic"));
        scan.setStartRow(startRow.getBytes());
        scan.setStopRow(stopRow.getBytes());
        scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowKeyRegEx)));
        List<String> list = new LinkedList<>();
        hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
            @Override
            public Void mapRow(Result result, int rowNum) throws Exception {
                list.add(Bytes.toString(result.getRow()));
                return null;
            }
        });
        return list.toArray(new String[list.size()]);
    }
    /**
     * 表总条数
     * @param tableName
     * @return
     * @throws Exception
     */
    public Integer count(String tableName) throws Exception {
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("basic"));
        scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^")));
        List<String> list = new LinkedList<>();
        hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
            @Override
            public Void mapRow(Result result, int rowNum) throws Exception {
                list.add(Bytes.toString(result.getRow()));
                return null;
            }
        });
        return list.size();
    }
    /**
     * 根据rowKey获取一条记录
     * @param tableName
     * @param rowKey
     * @return 字符串
     */
    public String get(String tableName, String rowKey) {
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<String>() {
            @Override
            public String mapRow(Result result, int rowNum) throws Exception {
                if(!result.isEmpty()) {
                    List<Cell> ceList = result.listCells();
                    Map<String, Object> map = new HashMap<String, Object>();
                    map.put("rowkey", rowKey);
                    for (Cell cell : ceList) {
                        // 默认不加列族
                        // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
                        map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                                Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                    return objectMapper.writeValueAsString(map);
                }
                else{
                    return "";
                }
            }
        });
    }
    /**
     * 通过rowKey获取某行数据
     * @param tableName
     * @param rowKey
     * @return Map
     */
    public Map<String, Object> getResultMap(String tableName, String rowKey) {
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
            @Override
            public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
                if(!result.isEmpty()) {
                    List<Cell> ceList = result.listCells();
                    Map<String, Object> map = new HashMap<String, Object>();
                    map.put("rowkey", rowKey);
                    for (Cell cell : ceList) {
                        //默认不加列族
                        // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
                        map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                                Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                    return map;
                }else {
                    return null;
                }
            }
        });
    }
    /**
     * 通过rowKey获取某行数据
     * @param tableName
     * @param rowKey
     * @return
     * @throws Exception
     */
    public Result getResult(String tableName, String rowKey) throws Exception {
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<Result>() {
            @Override
            public Result mapRow(Result result, int rowNum) throws Exception {
                return result;
            }
        });
    }
    /**
     * 通过表名和rowKey获取指定列族下的值
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族
     * @return
     */
    public Map<String, String> get(String tableName, String rowKey, String familyName) {
        return hbaseTemplate.get(tableName, rowKey, familyName, new RowMapper<Map<String, String>>(){
            @Override
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {
                Map<String, String> map = new HashMap<>();
                NavigableMap<byte[], byte[]> navigableMaps = result.getFamilyMap(familyName.getBytes());
                if(null != navigableMaps) {
                    for (byte[] key : navigableMaps.keySet()) {
                        String keys = new String(key);
                        String values = new String(navigableMaps.get(key));
                        map.put(keys, values);
                    }
                }
                return map;
            }
        });
    }
    /**
     * 通过表名和rowKey获取指定列族下的列名的值
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族
     * @param qualifier 列名
     * @return
     */
    public String get(String tableName, String rowKey, String familyName, String qualifier) {
        return hbaseTemplate.get(tableName, rowKey, familyName, qualifier, new RowMapper<String>(){
            @Override
            public String mapRow(Result result, int rowNum) throws Exception {
                Cell cell = result.getColumnLatestCell(familyName.getBytes(), qualifier.getBytes());
                return new String(CellUtil.cloneValue(cell));
            }
        });
    }
    /**
     * 通过rowKey集合获取指定列名下的多条数据
     * @param tableName 表名
     * @param rowKeys rowKeys
     * @param basicFl basic列族下的列名
     * @param dFl d列族下的列名
     * @return
     * @throws Exception
     */
    public Result[] getResultList(String tableName, List<String> rowKeys, String basicFl, String dFl) {
        return hbaseTemplate.execute(tableName, new TableCallback<Result[]>() {
            @Override
            public Result[] doInTable(HTableInterface table) throws Throwable {
                List<Get> list = new ArrayList<Get>();
                for (String rowKey : rowKeys) {
                    Get get = new Get(Bytes.toBytes(rowKey));
                    if (!StringUtils.isEmpty(basicFl)) {
                        String[] basicArr = basicFl.split(",");
                        for (String basicStr : basicArr) {
                            get.addColumn(Bytes.toBytes("basic"), Bytes.toBytes(basicStr));
                        }
                    }
                    if (!StringUtils.isEmpty(dFl)) {
                        String[] dArr = dFl.split(",");
                        for (String dStr : dArr) {
                            get.addColumn(Bytes.toBytes("d"), Bytes.toBytes(dStr));
                        }
                    }
                    list.add(get);
                }
                return table.get(list);
            }
        });
    }
    /************************************* Bean使用原型模式 ***************************************************************/
    /**
     * 保存数据 原型模式
     */
    public void save(String tableName, TableBundle tableBundle) {
        hbaseTemplate.execute(tableName, new TableCallback<Void>() {
            @Override
            public Void doInTable(HTableInterface table) throws Throwable {
                List<Put> puts = tableBundle.putOperations();
                Object[] results = new Object[puts.size()];
                table.batch(puts, results);
                return null;
            }
        });
    }
    /**
     * 删除数据 原型模式
     */
    public void delete(String tableName, TableBundle tableBundle) {
        hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
            @Override
            public Object[] doInTable(HTableInterface table) throws Throwable {
                List<Delete> deletes = tableBundle.deleteOperations();
                Object[] results = new Object[deletes.size()];
                table.batch(deletes, results);
                return null;
            }
        });
    }
    /**
     * 查询数据 原型模式
     */
    public Object[] get(String tableName, TableBundle tableBundle) {
        return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
            @Override
            public Object[] doInTable(HTableInterface table) throws Throwable {
                List<Get> gets = tableBundle.getOperations();
                Object[] results = new Object[gets.size()];
                table.batch(gets, results);
                if (results.length > 0 && results[0].toString().equals("keyvalues=NONE")) {
                    return null;
                }
                return results;
            }
        });
    }
}

+ 158 - 0
hbase-starter/src/main/java/com/yihu/hbase/TableBundle.java

@ -0,0 +1,158 @@
package com.yihu.hbase;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.*;
import java.util.stream.Collectors;
/**
 * 将HBase中的行,列族,列捆绑成一束。并一次性生成所需要的Get, Put操作。
 * <p>
 * 仅支持单表操作。
 * <p>
 * 虽然支持多种HBase操作,但请注意,一次只能用于一种操作,如:Get,Put,Delete不能混用,
 * 否则将出现难以预料的后果。
 *
 * @author Sand
 * @created 2016.04.27 14:38
 */
public class TableBundle {
    Map<String, Row> rows = new HashMap<>();
    public void addRows(String... rowKeys) {
        for (String rowKey : rowKeys) {
            rows.put(rowKey, null);
        }
    }
    private Row getRow(String rowKey) {
        Row row = rows.get(rowKey);
        if (row == null) {
            row = new Row();
            rows.put(rowKey, row);
        }
        return row;
    }
    public void addFamily(String rowKey, Object family) {
        Row row = getRow(rowKey);
        row.addFamily(family.toString());
    }
    public void addColumns(String rowKey, Object family, String[] columns) {
        Row row = getRow(rowKey);
        row.addColumns(family.toString(), columns);
    }
    public void addValues(String rowKey, Object family, Map<String, String> values) {
        Row row = getRow(rowKey);
        row.addValues(family.toString(), values);
    }
    public void clear() {
        rows.clear();
    }
    public List<Get> getOperations() {
        List<Get> gets = new ArrayList<>(rows.size());
        for (String rowKey : rows.keySet()) {
            Get get = new Get(Bytes.toBytes(rowKey));
            Row row = rows.get(rowKey);
            if (row != null) {
                for (String family : row.getFamilies()) {
                    Set<Object> columns = row.getCells(family);
                    if (CollectionUtils.isEmpty(columns)) {
                        get.addFamily(Bytes.toBytes(family));
                    }
                    for (Object column : columns) {
                        get.addColumn(Bytes.toBytes(family), Bytes.toBytes((String) column));
                    }
                }
            }
            gets.add(get);
        }
        return gets;
    }
    public List<Put> putOperations() {
        List<Put> puts = new ArrayList<>(rows.values().size());
        for (String rowKey : rows.keySet()) {
            Put put = new Put(Bytes.toBytes(rowKey));
            Row row = rows.get(rowKey);
            for (String family : row.getFamilies()) {
                Set<Object> columns = row.getCells(family);
                for (Object column : columns) {
                    Pair<String, String> pair = (Pair<String, String>) column;
                    if (StringUtils.isNotEmpty(pair.getRight())) {
                        put.addColumn(Bytes.toBytes(family),
                                Bytes.toBytes(pair.getLeft()),
                                Bytes.toBytes(pair.getRight()));
                    }
                }
            }
            puts.add(put);
        }
        return puts;
    }
    public List<Delete> deleteOperations() {
        List<Delete> deletes = new ArrayList<>(rows.values().size());
        for (String rowkey : rows.keySet()) {
            Delete delete = new Delete(Bytes.toBytes(rowkey));
            deletes.add(delete);
        }
        return deletes;
    }
    /**
     * HBase中的一行
     */
    public static class Row {
        private Map<String, Set<Object>> cells = new HashMap<>();   // key为family,value为columns
        public void addFamily(String family) {
            cells.put(family, null);
        }
        public void addColumns(String family, String... columns) {
            Set value = getFamily(family);
            for (String column : columns) {
                value.add(column);
            }
        }
        public void addValues(String family, Map<String, String> values) {
            Set value = getFamily(family);
            value.addAll(values.keySet().stream().map(key -> new ImmutablePair<>(key, values.get(key))).collect(Collectors.toList()));
        }
        public Set<String> getFamilies() {
            return cells.keySet();
        }
        public Set<Object> getCells(String family) {
            return cells.get(family);
        }
        private Set<Object> getFamily(String family) {
            Set value = cells.get(family);
            if (value == null) {
                value = new TreeSet<>();
                cells.put(family, value);
            }
            return value;
        }
    }
}

+ 89 - 0
hbase-starter/src/main/java/com/yihu/hbase/config/HbaseConfig.java

@ -0,0 +1,89 @@
package com.yihu.hbase.config;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
 * @author Sand
 * @version 1.0
 * @created 2015.11.28 16:26
 * @modified by Progr1mmer 2017/11/23 注释启动连接代码
 */
@Configuration
@ConfigurationProperties(prefix = "hadoop")
public class HbaseConfig{
    private Map<String, String> hbaseProperties = new HashMap<>();
    private User user = new User();
    public Map<String, String> getHbaseProperties(){
        return this.hbaseProperties;
    }
    public void setHbaseProperties(Map<String, String> hbaseProperties) {
        this.hbaseProperties = hbaseProperties;
    }
    public User getUser() {
        return user;
    }
    public void setUser(User user) {
        this.user = user;
    }
    public class User {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }
    @PostConstruct
    private void configInfo() {
        StringBuilder info = new StringBuilder("{");
        hbaseProperties.forEach((key, val) -> info.append("\n  hadoop." + key + " = " + val));
        info.append("\n  hadoop.user.name = " + user.getName());
        info.append("\n}");
        System.out.println("Hbase.configInfo : " + info.toString());
    }
    @Bean
    public org.apache.hadoop.conf.Configuration configuration() {
        Set<String> keys = new HashSet<>(hbaseProperties.keySet());
        for (String key : keys){
            String value = hbaseProperties.remove(key);
            key = key.replaceAll("^\\d{1,2}\\.", "");
            hbaseProperties.put(key, value);
        }
        org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
        hbaseProperties.keySet().stream().filter(key -> hbaseProperties.get(key) != null).forEach(key -> {
            configuration.set(key, hbaseProperties.get(key));
        });
        return configuration;
    }
    @Bean
    public HbaseTemplate hbaseTemplate(org.apache.hadoop.conf.Configuration configuration){
        System.setProperty("HADOOP_USER_NAME", user.getName() != null ? user.getName() : "root");
        HbaseTemplate hbaseTemplate = new HbaseTemplate();
        hbaseTemplate.setConfiguration(configuration);
        return hbaseTemplate;
    }
}

+ 76 - 81
mysql-starter/src/main/java/com/yihu/mysql/query/BaseJpaService.java

@ -57,8 +57,14 @@ public class BaseJpaService<T, R> {
        return (T) getRepository().save(entity);
    }
    public T retrieve(Serializable id) {
        return (T) getRepository().findOne(id);
    public void batchInsert(List list) {
        for (int i = 0; i < list.size(); i++) {
            entityManager.persist(list.get(i));
            if (i % 30 == 0) {
                entityManager.flush();
                entityManager.clear();
            }
        }
    }
    public void delete(Serializable id) {
@ -74,28 +80,15 @@ public class BaseJpaService<T, R> {
        getRepository().delete(list);
    }
    public Class<T> getEntityClass() {
        Type genType = this.getClass().getGenericSuperclass();
        Type[] parameters = ((ParameterizedType) genType).getActualTypeArguments();
        return (Class) parameters[0];
    public int delete(Object[] ids){
        String hql = " DELETE FROM " + getEntityClass().getName() + " WHERE " + getEntityIdFiled() + " in(:ids)";
        Query query = currentSession().createQuery(hql);
        query.setParameterList("ids", ids);
        return query.executeUpdate();
    }
    public List search(String fields, String filters, String sorts, Integer page, Integer size) throws ParseException {
        URLQueryParser queryParser = createQueryParser(fields, filters, sorts);
        CriteriaQuery query = queryParser.makeCriteriaQuery();
        if (page == null || page <= 0) {
            page = defaultPage;
        }
        if (size == null || size <= 0 || size > 10000) {
            size = defaultSize;
        }
        return entityManager
                .createQuery(query)
                .setFirstResult((page - 1) * size)
                .setMaxResults(size)
                .getResultList();
    public T retrieve(Serializable id) {
        return (T) getRepository().findOne(id);
    }
    public List search(String filters) throws ParseException {
@ -124,6 +117,47 @@ public class BaseJpaService<T, R> {
                .getResultList();
    }
    public List search(String fields, String filters, String sorts, Integer page, Integer size) throws ParseException {
        URLQueryParser queryParser = createQueryParser(fields, filters, sorts);
        CriteriaQuery query = queryParser.makeCriteriaQuery();
        if (page == null || page <= 0) {
            page = defaultPage;
        }
        if (size == null || size <= 0 || size > 10000) {
            size = defaultSize;
        }
        return entityManager
                .createQuery(query)
                .setFirstResult((page - 1) * size)
                .setMaxResults(size)
                .getResultList();
    }
    public List<T> findByField(String field, Object value){
        return findByFields(
                new String[]{field},
                new Object[]{value}
        );
    }
    public List<T> findByFields(String[] fields, Object[] values){
        CriteriaBuilder criteriaBuilder = entityManager.getCriteriaBuilder();
        CriteriaQuery query = criteriaBuilder.createQuery(getEntityClass());
        Root<T> root = query.from(getEntityClass());
        List<Predicate> ls = new ArrayList<>();
        for(int i=0; i< fields.length; i++){
            if(values[i].getClass().isArray())
                ls.add(criteriaBuilder.in(root.get(fields[i]).in((Object[])values[i])));
            else
                ls.add(criteriaBuilder.equal(root.get(fields[i]), values[i]));
        }
        query.where(ls.toArray(new Predicate[ls.size()]));
        return entityManager
                .createQuery(query)
                .getResultList() ;
    }
    public long getCount(String filters) throws ParseException {
        URLQueryParser queryParser = createQueryParser(filters);
        CriteriaQuery query = queryParser.makeCriteriaCountQuery();
@ -131,6 +165,12 @@ public class BaseJpaService<T, R> {
        return (long) entityManager.createQuery(query).getSingleResult();
    }
    public Class<T> getEntityClass() {
        Type genType = this.getClass().getGenericSuperclass();
        Type[] parameters = ((ParameterizedType) genType).getActualTypeArguments();
        return (Class) parameters[0];
    }
    protected <T> URLQueryParser createQueryParser(String fields, String filters, String orders) {
        URLQueryParser queryParser = new URLQueryParser<T>(fields, filters, orders)
                .setEntityManager(entityManager)
@ -156,10 +196,8 @@ public class BaseJpaService<T, R> {
                    elem -> orderList.add(
                            elem.startsWith("+") ? new Sort.Order(Sort.Direction.ASC, elem.substring(1)):
                                    new Sort.Order(Sort.Direction.DESC, elem.substring(1))));
            return new Sort(orderList);
        }
        return null;
    }
@ -175,31 +213,6 @@ public class BaseJpaService<T, R> {
        return (JpaRepository) SpringApplicationContext.getService(repoClass);
    }
    public List<T> findByField(String field, Object value){
        return findByFields(
                new String[]{field},
                new Object[]{value}
        );
    }
    public List<T> findByFields(String[] fields, Object[] values){
        CriteriaBuilder criteriaBuilder = entityManager.getCriteriaBuilder();
        CriteriaQuery query = criteriaBuilder.createQuery(getEntityClass());
        Root<T> root = query.from(getEntityClass());
        List<Predicate> ls = new ArrayList<>();
        for(int i=0; i< fields.length; i++){
            if(values[i].getClass().isArray())
                ls.add(criteriaBuilder.in(root.get(fields[i]).in((Object[])values[i])));
            else
                ls.add(criteriaBuilder.equal(root.get(fields[i]), values[i]));
        }
        query.where(ls.toArray(new Predicate[ls.size()]));
        return entityManager
                .createQuery(query)
                .getResultList() ;
    }
    public String getClzName(){
        return getEntityClass().getName();
    }
@ -211,22 +224,21 @@ public class BaseJpaService<T, R> {
        return s;
    }
    public int delete(Object[] ids){
        String hql = " DELETE FROM "+getEntityClass().getName()+" WHERE "+getEntityIdFiled()+" in(:ids)";
        Query query = currentSession().createQuery(hql);
        query.setParameterList("ids", ids);
        return query.executeUpdate();
    }
    public void batchInsert(List list) {
        for (int i = 0; i < list.size(); i++) {
            entityManager.persist(list.get(i));
            if (i % 30 == 0) {
                entityManager.flush();
                entityManager.clear();
            }
    /**
     * 将实体转换为模型。
     *
     * @param source
     * @param targetCls
     * @param <T>
     * @return
     */
    public <T> T convertToModel(Object source, Class<T> targetCls) {
        if (source == null) {
            return null;
        }
        T target = BeanUtils.instantiate(targetCls);
        BeanUtils.copyProperties(source, target);
        return target;
    }
    /**
@ -246,23 +258,6 @@ public class BaseJpaService<T, R> {
        return targets;
    }
    /**
     * 将实体转换为模型。
     *
     * @param source
     * @param targetCls
     * @param <T>
     * @return
     */
    public <T> T convertToModel(Object source, Class<T> targetCls) {
        if (source == null) {
            return null;
        }
        T target = BeanUtils.instantiate(targetCls);
        BeanUtils.copyProperties(source, target);
        return target;
    }
    public String getCode() {
        return UUID.randomUUID().toString().replaceAll("-", "");
    }

+ 64 - 0
pom.xml

@ -23,6 +23,9 @@
        <module>mysql-starter</module>
        <module>commons/utils</module>
        <module>elasticsearch-jest-starter</module>
        <module>hbase-starter</module>
        <module>solr-starter</module>
        <module>elasticsearch-transport-starter</module>
    </modules>
    <!-- 项目发布的这个服务器 -->
@ -41,6 +44,7 @@
        <version.swagger-ui>2.7.0</version.swagger-ui>
        <version.mysql>5.1.45</version.mysql>
        <version.commons-dbcp2>2.1.1</version.commons-dbcp2>
        <version.commons-lang3>3.2.1</version.commons-lang3>
        <version.hibernate>5.0.12.Final</version.hibernate>
        <version.hibernate-validator>6.0.10.Final</version.hibernate-validator>
        <version.hibernate-jpa-api>1.0.0.Final</version.hibernate-jpa-api>
@ -48,6 +52,9 @@
        <verion.fastjson>1.2.17</verion.fastjson>
        <version.druid>1.0.15</version.druid>
        <version.jna>3.0.9</version.jna>
        <version.hbase-client>1.1.1</version.hbase-client>
        <version.spring-data-hadoop>2.2.0.RELEASE</version.spring-data-hadoop>
        <version.spring-data-solr>2.1.3.RELEASE</version.spring-data-solr>
    </properties>
    <dependencyManagement>
@ -132,6 +139,11 @@
                <artifactId>commons-dbcp2</artifactId>
                <version>${version.commons-dbcp2}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${version.commons-lang3}</version>
            </dependency>
            <!-- Jackson library -->
            <dependency>
@ -149,6 +161,58 @@
                <artifactId>jackson-databind</artifactId>
                <version>${version.jackson}</version>
            </dependency>
            <!-- Hbase library -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${version.hbase-client}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>${version.hbase-client}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-protocol</artifactId>
                <version>${version.hbase-client}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-hadoop-hbase</artifactId>
                <version>${version.spring-data-hadoop}</version>
            </dependency>
            <!-- Solr library -->
            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-solr</artifactId>
                <version>${version.spring-data-solr}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

+ 30 - 0
solr-starter/pom.xml

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>jkzl-starter</artifactId>
        <groupId>com.yihu</groupId>
        <version>2.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>solr-starter</artifactId>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-solr</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>
    </dependencies>
</project>

+ 119 - 0
solr-starter/src/main/java/com/yihu/solr/SolrAdmin.java

@ -0,0 +1,119 @@
package com.yihu.solr;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Solr底层操作类
 *
 * @author hzp
 * @version 1.0
 * @created 2017.05.06
 */
@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class SolrAdmin {
    private static final Logger logger = LoggerFactory.getLogger(SolrAdmin.class);
    @Autowired
    private SolrPool pool;
    /**
     * 新建单条索引
     */
    public Boolean create(String core,Map<String, Object> map) throws Exception {
        SolrClient client = pool.getConnection(core);
        SolrInputDocument doc = new SolrInputDocument();
        //注意date的格式,要进行适当的转化
        for(String key:map.keySet()) {
            doc.addField(key, map.get(key));
        }
        UpdateResponse re = client.add(doc);
        client.commit();
        if (re.getStatus() != 0) {
            logger.info("create index cost " + re.getQTime());
            return true;
        } else{
            logger.warn("create index failed!");
            return false;
        }
    }
    /**
     * 修改单条索引单字段
     */
    public Boolean update(String core,String uniqueKey,String uniqueKeyValue,String key,Object value) throws Exception {
        Map<String,Object> map = new HashMap();
        map.put(key, value);
        return update(core,uniqueKey + ":" + uniqueKeyValue, map);
    }
    /**
     * 修改索引多字段
     */
    public Boolean update(String core,String keyQuery,Map<String, Object> map) throws Exception {
        SolrClient client = pool.getConnection(core);
        QueryResponse qr = client.query(new SolrQuery(keyQuery));
        SolrDocumentList docs = qr.getResults();
        if(docs != null && docs.size() > 0) {
            List<SolrInputDocument> solrList = new ArrayList<>();
            for(int i = 0; i < docs.size(); i++) {
                SolrDocument doc = docs.get(i);
                SolrInputDocument newItem = new SolrInputDocument();
                newItem.addField("rowkey",doc.get("rowkey"));
                for(String key :map.keySet()) {
                    newItem.addField(key,map.get(key));
                }
                solrList.add(newItem);
            }
            UpdateResponse re = client.add(solrList);
            client.commit();
            if(re.getStatus() != 0) {
                logger.info("update index cost " + re.getQTime());
                return true;
            } else{
                logger.warn("update index failed!");
                return false;
            }
        } else{
            logger.warn("Null result!");
        }
        return true;
    }
    /**
     * 删除单条索引
     */
    public Boolean delete(String core,String keyQuery) throws Exception {
        SolrClient client = pool.getConnection(core);
        UpdateResponse de = client.deleteByQuery(keyQuery);
        client.commit();
        if (de.getStatus() != 0) {
            logger.info("delete index cost " + de.getQTime());
            return true;
        } else{
            logger.warn("delete index failed!");
            return false;
        }
    }
}

+ 47 - 0
solr-starter/src/main/java/com/yihu/solr/SolrPool.java

@ -0,0 +1,47 @@
package com.yihu.solr;
import com.yihu.solr.config.SolrConfig;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.data.solr.server.support.MulticoreSolrClientFactory;
import org.springframework.stereotype.Service;
/**
 * Solr连接池
 * @author hzp
 * @version 1.0
 * @created 2016.04.26
 */
@Service
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class SolrPool {
    @Autowired
    private SolrConfig solrConfig;
    private static volatile MulticoreSolrClientFactory factory;
    protected MulticoreSolrClientFactory getFactory(){
        if (factory != null) {
            return factory;
        }
        synchronized (MulticoreSolrClientFactory.class) {
            if (null == factory) {
                CloudSolrClient client = new CloudSolrClient(solrConfig.getZkHost());
                factory = new MulticoreSolrClientFactory(client);
            }
        }
        return factory;
    }
    public SolrClient getConnection(String core) throws Exception{
        if (factory != null) {
            return factory.getSolrClient(core);
        }
        return getFactory().getSolrClient(core);
    }
}

+ 555 - 0
solr-starter/src/main/java/com/yihu/solr/SolrUtil.java

@ -0,0 +1,555 @@
package com.yihu.solr;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.response.*;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.FacetParams;
import org.apache.solr.common.params.GroupParams;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import java.util.*;
/**
 * Solr底层查询类
 *
 * @author hzp
 * @version 1.0
 * @created 2016.04.26
 */
@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class SolrUtil {
    private static final Logger logger = LoggerFactory.getLogger(SolrUtil.class);
    private final static String ASC = "asc";
    @Autowired
    private SolrPool pool;
    /**
     * 简单查询方法
     */
    public SolrDocumentList query(String tableName, String q, Map<String, String> sort, long start, long rows) throws Exception {
        return query(tableName, q, null, sort, start, rows, null);
    }
    /**
     * 简单查询返回字段
     */
    public SolrDocumentList queryReturnFieldList(String tableName, String q, String fq, Map<String, String> sort, long start, long rows, String... fields) throws Exception {
        return query(tableName, q, fq, sort, start, rows, fields);
    }
    /**
     * Solr查询方法
     *
     * @param q      查询字符串
     * @param fq     过滤查询
     * @param sort   排序
     * @param start  查询起始行
     * @param rows   查询行数
     * @param fields 返回字段
     * @return
     */
    public SolrDocumentList query(String core, String q, String fq, Map<String, String> sort, long start, long rows, String... fields) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) { //设置查询条件
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && !fq.equals("")) { //设置过滤条件
            query.setFilterQueries(fq);
        }
        query.setFields(fields);
        query.setStart(Integer.parseInt(String.valueOf(start)));//设置查询起始行
        query.setRows(Integer.parseInt(String.valueOf(rows)));//设置查询行数
        //设置排序
        if (sort != null) {
            for (Object co : sort.keySet()) {
                if (ASC == sort.get(co).toLowerCase() || ASC.equals(sort.get(co).toLowerCase())) {
                    query.addSort(co.toString(), SolrQuery.ORDER.asc);
                } else {
                    query.addSort(co.toString(), SolrQuery.ORDER.desc);
                }
            }
        }
        QueryResponse rsp = conn.query(query);
        SolrDocumentList docs = rsp.getResults();
        return docs;
    }
    /**
     * Solr单个字段去重查询
     *
     * @param q          可选,查询字符串
     * @param fq         可选,过滤查询
     * @param sort       可选,排序
     * @param start      必填,查询起始行
     * @param rows       必填,查询行数
     * @param fields     必填,返回字段
     * @param groupField 必填,分组去重字段。针对一个字段去重。
     * @param groupSort  可选,组内排序字段,如:"event_date asc"
     * @return
     */
    public List<Group> queryDistinctOneField(String core, String q, String fq, Map<String, String> sort, int start, int rows,
                                             String[] fields, String groupField, String groupSort) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (StringUtils.isNotEmpty(q)) {
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (StringUtils.isNotEmpty(fq)) {
            query.setFilterQueries(fq);
        }
        if (sort != null) {
            for (Object co : sort.keySet()) {
                if (ASC.equals(sort.get(co).toLowerCase())) {
                    query.addSort(co.toString(), SolrQuery.ORDER.asc);
                } else {
                    query.addSort(co.toString(), SolrQuery.ORDER.desc);
                }
            }
        }
        query.setFields(fields);
        query.setStart(start);
        query.setRows(10000000);
        query.setParam(GroupParams.GROUP, true);
        query.setParam(GroupParams.GROUP_FORMAT, "grouped");
        query.setParam(GroupParams.GROUP_FIELD, groupField);
        if (StringUtils.isNotEmpty(groupSort)) {
            query.setParam(GroupParams.GROUP_SORT, groupSort);
        }
        List<Group> groups = new ArrayList<>();
        QueryResponse response = conn.query(query);
        GroupResponse groupResponse = response.getGroupResponse();
        if (groupResponse != null) {
            List<GroupCommand> groupList = groupResponse.getValues();
            for (GroupCommand groupCommand : groupList) {
                groups = groupCommand.getValues();
            }
        }
        return groups;
    }
    /**
     * Solr单个字段去重查询
     *
     * @param q          可选,查询字符串
     * @param fq         可选,过滤查询
     * @param sort       可选,排序
     * @param start      必填,查询起始行
     * @param rows       必填,查询行数
     * @param fields     必填,返回字段
     * @param groupField 必填,分组去重字段。针对一个字段去重。
     * @param groupSort  可选,组内排序字段,如:"event_date asc"
     * @return
     */
    public SolrDocumentList queryDistinctOneFieldForDocList(String core, String q, String fq, Map<String, String> sort, int start, int rows,
                                                            String[] fields, String groupField, String groupSort) throws Exception {
        SolrDocumentList solrDocumentList = new SolrDocumentList();
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        List<Group> groups = queryDistinctOneField(core, q, fq, sort, start, rows, fields, groupField, groupSort);
        QueryResponse response = conn.query(query);
        GroupResponse groupResponse = response.getGroupResponse();
        if (groupResponse != null) {
            List<GroupCommand> groupList = groupResponse.getValues();
            for (GroupCommand groupCommand : groupList) {
                groups = groupCommand.getValues();
                for (Group group : groups) {
                    if (group.getResult().size() > 0) {
                        solrDocumentList.add(group.getResult().get(0));
                    }
                }
            }
        }
        return solrDocumentList;
    }
    /**
     * Solr查询方法 多个过滤条件
     *
     * @param q     查询字符串
     * @param fq    过滤查询  多个过滤条件
     * @param sort  排序
     * @param start 查询起始行
     * @param rows  查询行数
     * @return
     */
    public SolrDocumentList queryByfqs(String core, String q, String[] fq, Map<String, String> sort, long start, long rows) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) {
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && fq.length > 0) {
            query.setFilterQueries(fq);
        }
        query.setStart(Integer.parseInt(String.valueOf(start)));//设置查询起始行
        query.setRows(Integer.parseInt(String.valueOf(rows)));//设置查询行数
        //设置排序
        if (sort != null) {
            for (Object co : sort.keySet()) {
                if (ASC == sort.get(co).toLowerCase() || ASC.equals(sort.get(co).toLowerCase())) {
                    query.addSort(co.toString(), SolrQuery.ORDER.asc);
                } else {
                    query.addSort(co.toString(), SolrQuery.ORDER.desc);
                }
            }
        }
        QueryResponse rsp = conn.query(query);
        return rsp.getResults();
    }
    /**
     * 总数查询方法
     */
    public long count(String core, String q) throws Exception {
        return count(core, q, null);
    }
    /**
     * 总数查询方法
     */
    public long count(String core, String q, String fq) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) { //设置查询条件
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && !fq.equals("")) {
            query.setFilterQueries(fq);
        }
        query.setStart(0);
        query.setRows(0);
        QueryResponse rsp = conn.query(query);
        long count = rsp.getResults().getNumFound();
        //query.setStart(start);
        //rsp = conn.query(query);
        //SolrDocumentList docs = rsp.getResults();
        return count;
    }
    /**
     * 单组分组Count统计(start从0开始)
     *
     * @param core       core名
     * @param q          查询条件
     * @param fq         筛选条件
     * @param groupField 分组字段名
     * @param start      起始偏移位
     * @param limit      结果条数,为负数则不限制
     */
    public Map<String, Long> groupCount(String core, String q, String fq, String groupField, int start, int limit) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) {
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && !fq.equals("")) {
            query.setFilterQueries(fq);
        }
        query.setFacet(true);//设置facet=on
        query.setRows(0);
        query.addFacetField(groupField);
        query.setFacetLimit(limit);//限制每次返回结果数
        query.set(FacetParams.FACET_OFFSET, start);
        query.setFacetMissing(false);//不统计null的值
        query.setFacetMinCount(0);// 设置返回的数据中每个分组的数据最小值,比如设置为0,则统计数量最小为0,不然不显示
        QueryResponse rsp = conn.query(query);
        List<FacetField.Count> countList = rsp.getFacetField(groupField).getValues();
        Map<String, Long> rmap = new HashMap<>();
        for (FacetField.Count count : countList) {
            if (count.getCount() > 0)
                rmap.put(count.getName(), (long) count.getCount());
        }
        return rmap;
    }
    /**
     * 多组分组Count(独立计算)
     *
     * @param core        core名
     * @param q           查询条件
     * @param fq          筛选条件
     * @param groupFields 分组字段名
     */
    public List<FacetField> groupCount(String core, String q, String fq, String[] groupFields) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) {
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && !fq.equals("")) {
            query.setFilterQueries(fq);
        }
        query.setFacet(true);//设置facet=on
        query.setRows(0);
        query.addFacetField(groupFields);
        query.setFacetLimit(-1); // 限制每次返回结果数
        query.set(FacetParams.FACET_OFFSET, 0);
        query.setFacetMissing(false); // 不统计null的值
        query.setFacetMinCount(0); // 设置返回的数据中每个分组的数据最小值,比如设置为0,则统计数量最小为0,不然不显示
        QueryResponse rsp = conn.query(query);
        return rsp.getFacetFields();
    }
    /**
     * 多组分组Count统计(关联计算)
     *
     * @param core        core名
     * @param q           查询条件
     * @param fq          筛选条件
     * @param groupFields 分组字段名
     * @param start       起始偏移位
     * @param limit       结果条数,为负数则不限制
     */
    public List<PivotField> groupCountMult(String core, String q, String fq, String groupFields, int start, int limit) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) {
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && !fq.equals("")) {
            query.setFilterQueries(fq);
        }
        query.setFacet(true);//设置facet=on
        query.setRows(0);
        query.addFacetPivotField(groupFields);
        query.set(FacetParams.FACET_OFFSET, start);
        query.setFacetLimit(limit);//限制每次返回结果数
        query.setFacetMissing(false);//不统计null的值
        query.setFacetMinCount(0);// 设置返回的数据中每个分组的数据最小值,比如设置为0,则统计数量最小为0,不然不显示
        QueryResponse rsp = conn.query(query);
        NamedList<List<PivotField>> namedList = rsp.getFacetPivot();
        if (namedList != null && namedList.size() > 0) {
            return namedList.getVal(0);
        } else {
            return null;
        }
    }
    /**
     * 分组数值统计
     *
     * @param core       表名
     * @param q          查询条件
     * @param statsField 统计字段
     * @return
     */
    public FieldStatsInfo getStats(String core, String q, String fq, String statsField) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) {
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && !fq.equals("")) {
            query.setFilterQueries(fq);
        }
        query.addGetFieldStatistics(statsField);
        query.setRows(0);
        QueryResponse rsp = conn.query(query);
        Map<String, FieldStatsInfo> stats = rsp.getFieldStatsInfo();
        if (stats != null && stats.size() > 0) {
            return stats.get(statsField);
        }
        return null;
    }
    /**
     * 分组数值统计
     *
     * @param core       表名
     * @param q          查询条件
     * @param statsField 统计字段
     * @param groupField 分组字段
     * @return
     */
    public List<FieldStatsInfo> getStats(String core, String q, String fq, String statsField, String groupField) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (null != q && !q.equals("")) {
            query.setQuery(q);
        } else {
            query.setQuery("*:*");
        }
        if (null != fq && !fq.equals("")) {
            query.setFilterQueries(fq);
        }
        query.addGetFieldStatistics(statsField);
        query.addStatsFieldFacets(statsField, groupField);
        query.setRows(0);
        QueryResponse rsp = conn.query(query);
        Map<String, FieldStatsInfo> stats = rsp.getFieldStatsInfo();
        if (stats != null && stats.size() > 0) {
            Map<String, List<FieldStatsInfo>> map = stats.get(statsField).getFacets();
            if (map != null) {
                return map.get(groupField);
            }
        }
        return null;
    }
    /**
     * 查询统计
     *
     * @param core       core名
     * @param facetQuery 查询条件
     */
    public Map<String, Integer> getFacetQuery(String core, String facetQuery) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        query.setQuery("*:*");
        query.setFacet(true);
        query.addFacetQuery(facetQuery);
        QueryResponse resp = conn.query(query);
        return resp.getFacetQuery();
    }
    /**
     * 单字段分组统计
     *
     * @param core
     * @param facetField
     * @param fq
     * @param minCount
     * @param start
     * @param limit
     * @param missing
     */
    public FacetField getFacetField(String core, String facetField, String fq, int minCount, int start, int limit, boolean missing) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        query.setQuery("*:*");
        if (!StringUtils.isBlank(fq)) {
            query.setFilterQueries(fq);
        }
        query.setStart(start)
                .setRows(0)
                .setFacet(true)
                .addFacetField(facetField)
                .setFacetMinCount(minCount)
                .setFacetLimit(limit)
                .setFacetMissing(missing);
        QueryResponse resp = conn.query(query);
        return resp.getFacetField(facetField);
    }
    /**
     * 日期范围分组统计
     */
    public List<RangeFacet> getFacetDateRange(String core, String dateField, Date startTime, Date endTime, String gap, String fq) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        query.setQuery("*:*");
        if (!StringUtils.isBlank(fq)) {
            query.setFilterQueries(fq);
        }
        query.setRows(0)
                .setFacet(true)
                .addDateRangeFacet(dateField, startTime, endTime, gap);
        QueryResponse resp = conn.query(query);
        return resp.getFacetRanges();
    }
    /**
     * 日期范围分组统计
     */
    public List<RangeFacet> getFacetDateRange(String core, String field, String start, String end, String gap, String fq, String q) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        if (StringUtils.isEmpty(q)) {
            query.setQuery("*:*");
        } else {
            query.setQuery(q);
        }
        if (!StringUtils.isEmpty(fq)) {
            query.setFilterQueries(fq);
        }
        query.setRows(0)
                .setFacet(true)
                .setFacetMissing(false)
                .add("facet.range", new String[]{field})
                .add(String.format(Locale.ROOT, "f.%s.%s", new Object[]{field, "facet.range.start"}), new String[]{start})
                .add(String.format(Locale.ROOT, "f.%s.%s", new Object[]{field, "facet.range.end"}), new String[]{end})
                .add(String.format(Locale.ROOT, "f.%s.%s", new Object[]{field, "facet.range.gap"}), new String[]{gap});
        QueryResponse resp = conn.query(query);
        return resp.getFacetRanges();
    }
    /**
     * 数值型字段范围统计
     */
    public List<RangeFacet> getFacetNumRange(String core, String field, int start, int end, int gap, String fq) throws Exception {
        SolrClient conn = pool.getConnection(core);
        SolrQuery query = new SolrQuery();
        query.setQuery("*:*");
        if (!StringUtils.isBlank(fq)) {
            query.setFilterQueries(fq);
        }
        query.setRows(0)
                .setFacet(true)
                .addNumericRangeFacet(field, start, end, gap);
        QueryResponse resp = conn.query(query);
        return resp.getFacetRanges();
    }
}

+ 33 - 0
solr-starter/src/main/java/com/yihu/solr/config/SolrConfig.java

@ -0,0 +1,33 @@
package com.yihu.solr.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
 * Created by progr1mmer on 2018/7/26.
 */
@Configuration
@ConfigurationProperties(prefix = "spring.data.solr")
public class SolrConfig {
    private String zkHost;
    public String getZkHost() {
        return zkHost;
    }
    public void setZkHost(String zkHost) {
        this.zkHost = zkHost;
    }
    @PostConstruct
    private void configInfo() {
        StringBuilder info = new StringBuilder("{");
        info.append("\n  spring.data.solr.zk-host = " + zkHost);
        info.append("\n}");
        System.out.println("Solr.configInfo : " + info.toString());
    }
}