suxiaoyang пре 6 година
родитељ
комит
af86c3e145

+ 100 - 147
hbase-starter/src/main/java/com/yihu/hbase/HBaseDao.java

@ -9,8 +9,6 @@ import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@ -18,7 +16,7 @@ import java.util.*;
/**
 * HBase - DML
 * @Author Progr1mmer
 * @author Progr1mmer
 */
@Service
public class HBaseDao extends AbstractHBaseClient {
@ -57,21 +55,18 @@ public class HBaseDao extends AbstractHBaseClient {
     * @param values
     */
    public void add(String tableName, String rowKey, String family, Object[] columns, Object[] values) {
        hbaseTemplate.execute(tableName, new TableCallback<Void>() {
            @Override
            public Void doInTable(HTableInterface table) throws Throwable {
                Put put = new Put(Bytes.toBytes(rowKey));
                for (int j = 0; j < columns.length; j++) {
                    //为空字段不保存
                    if (values[j] != null) {
                        String column = String.valueOf(columns[j]);
                        String value = String.valueOf(values[j]);
                        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
                    }
        hbaseTemplate.execute(tableName, (table) -> {
            Put put = new Put(Bytes.toBytes(rowKey));
            for (int j = 0; j < columns.length; j++) {
                //为空字段不保存
                if (values[j] != null) {
                    String column = String.valueOf(columns[j]);
                    String value = String.valueOf(values[j]);
                    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
                }
                table.put(put);
                return null;
            }
            table.put(put);
            return null;
        });
    }
@ -81,13 +76,10 @@ public class HBaseDao extends AbstractHBaseClient {
     * @param rowKey
     */
    public void delete(String tableName, String rowKey)  {
        hbaseTemplate.execute(tableName, new TableCallback<Void>() {
            @Override
            public Void doInTable(HTableInterface table) throws Throwable {
                Delete d = new Delete(rowKey.getBytes());
                table.delete(d);
                return null;
            }
        hbaseTemplate.execute(tableName, (table) -> {
            Delete d = new Delete(rowKey.getBytes());
            table.delete(d);
            return null;
        });
    }
@ -99,18 +91,15 @@ public class HBaseDao extends AbstractHBaseClient {
     * @throws Exception
     */
    public Object[] deleteBatch(String tableName, String[] rowKeys) {
        return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
            @Override
            public Object[] doInTable(HTableInterface table) throws Throwable {
                List<Delete> deletes = new ArrayList<>(rowKeys.length);
                for (String rowKey : rowKeys) {
                    Delete delete = new Delete(Bytes.toBytes(rowKey));
                    deletes.add(delete);
                }
                Object[] results = new Object[deletes.size()];
                table.batch(deletes, results);
                return results;
        return hbaseTemplate.execute(tableName, (table) -> {
            List<Delete> deletes = new ArrayList<>(rowKeys.length);
            for (String rowKey : rowKeys) {
                Delete delete = new Delete(Bytes.toBytes(rowKey));
                deletes.add(delete);
            }
            Object[] results = new Object[deletes.size()];
            table.batch(deletes, results);
            return results;
        });
    }
@ -158,12 +147,9 @@ public class HBaseDao extends AbstractHBaseClient {
        scan.setStopRow(stopRow.getBytes());
        scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowKeyRegEx)));
        List<String> list = new LinkedList<>();
        hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
            @Override
            public Void mapRow(Result result, int rowNum) throws Exception {
                list.add(Bytes.toString(result.getRow()));
                return null;
            }
        hbaseTemplate.find(tableName, scan, (result, rowNum) ->  {
            list.add(Bytes.toString(result.getRow()));
            return null;
        });
        return list.toArray(new String[list.size()]);
    }
@ -179,12 +165,9 @@ public class HBaseDao extends AbstractHBaseClient {
        scan.addFamily(Bytes.toBytes("basic"));
        scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^")));
        List<String> list = new LinkedList<>();
        hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
            @Override
            public Void mapRow(Result result, int rowNum) throws Exception {
                list.add(Bytes.toString(result.getRow()));
                return null;
            }
        hbaseTemplate.find(tableName, scan, (result, rowNum)-> {
            list.add(Bytes.toString(result.getRow()));
            return null;
        });
        return list.size();
    }
@ -196,24 +179,20 @@ public class HBaseDao extends AbstractHBaseClient {
     * @return 字符串
     */
    public String get(String tableName, String rowKey) {
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<String>() {
            @Override
            public String mapRow(Result result, int rowNum) throws Exception {
                if(!result.isEmpty()) {
                    List<Cell> ceList = result.listCells();
                    Map<String, Object> map = new HashMap<String, Object>();
                    map.put("rowkey", rowKey);
                    for (Cell cell : ceList) {
                        // 默认不加列族
                        // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
                        map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                                Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                    return objectMapper.writeValueAsString(map);
                }
                else{
                    return "";
        return hbaseTemplate.get(tableName, rowKey, (result, rowNum) -> {
            if (!result.isEmpty()) {
                List<Cell> ceList = result.listCells();
                Map<String, Object> map = new HashMap<>();
                map.put("rowkey", rowKey);
                for (Cell cell : ceList) {
                    // 默认不加列族
                    // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
                    map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
                return objectMapper.writeValueAsString(map);
            } else {
                return "";
            }
        });
    }
@ -225,23 +204,20 @@ public class HBaseDao extends AbstractHBaseClient {
     * @return Map
     */
    public Map<String, Object> getResultMap(String tableName, String rowKey) {
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
            @Override
            public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
                if(!result.isEmpty()) {
                    List<Cell> ceList = result.listCells();
                    Map<String, Object> map = new HashMap<String, Object>();
                    map.put("rowkey", rowKey);
                    for (Cell cell : ceList) {
                        //默认不加列族
                        // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
                        map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                                Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                    return map;
                }else {
                    return null;
        return hbaseTemplate.get(tableName, rowKey, (result, rowNum)-> {
            if (!result.isEmpty()) {
                List<Cell> ceList = result.listCells();
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("rowkey", rowKey);
                for (Cell cell : ceList) {
                    //默认不加列族
                    // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
                    map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                            Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
                return map;
            } else {
                return null;
            }
        });
    }
@ -254,12 +230,7 @@ public class HBaseDao extends AbstractHBaseClient {
     * @throws Exception
     */
    public Result getResult(String tableName, String rowKey) throws Exception {
        return hbaseTemplate.get(tableName, rowKey, new RowMapper<Result>() {
            @Override
            public Result mapRow(Result result, int rowNum) throws Exception {
                return result;
            }
        });
        return hbaseTemplate.get(tableName, rowKey, (result, rowNum)-> result);
    }
    /**
@ -270,20 +241,17 @@ public class HBaseDao extends AbstractHBaseClient {
     * @return
     */
    public Map<String, String> get(String tableName, String rowKey, String familyName) {
        return hbaseTemplate.get(tableName, rowKey, familyName, new RowMapper<Map<String, String>>(){
            @Override
            public Map<String, String> mapRow(Result result, int rowNum) throws Exception {
                Map<String, String> map = new HashMap<>();
                NavigableMap<byte[], byte[]> navigableMaps = result.getFamilyMap(familyName.getBytes());
                if(null != navigableMaps) {
                    for (byte[] key : navigableMaps.keySet()) {
                        String keys = new String(key);
                        String values = new String(navigableMaps.get(key));
                        map.put(keys, values);
                    }
        return hbaseTemplate.get(tableName, rowKey, familyName, (result, rowNum)-> {
            Map<String, String> map = new HashMap<>();
            NavigableMap<byte[], byte[]> navigableMaps = result.getFamilyMap(familyName.getBytes());
            if(null != navigableMaps) {
                for (byte[] key : navigableMaps.keySet()) {
                    String keys = new String(key);
                    String values = new String(navigableMaps.get(key));
                    map.put(keys, values);
                }
                return map;
            }
            return map;
        });
    }
@ -296,12 +264,9 @@ public class HBaseDao extends AbstractHBaseClient {
     * @return
     */
    public String get(String tableName, String rowKey, String familyName, String qualifier) {
        return hbaseTemplate.get(tableName, rowKey, familyName, qualifier, new RowMapper<String>(){
            @Override
            public String mapRow(Result result, int rowNum) throws Exception {
                Cell cell = result.getColumnLatestCell(familyName.getBytes(), qualifier.getBytes());
                return new String(CellUtil.cloneValue(cell));
            }
        return hbaseTemplate.get(tableName, rowKey, familyName, qualifier, (result, rowNum)-> {
            Cell cell = result.getColumnLatestCell(familyName.getBytes(), qualifier.getBytes());
            return new String(CellUtil.cloneValue(cell));
        });
    }
@ -315,28 +280,25 @@ public class HBaseDao extends AbstractHBaseClient {
     * @throws Exception
     */
    public Result[] getResultList(String tableName, List<String> rowKeys, String basicFl, String dFl) {
        return hbaseTemplate.execute(tableName, new TableCallback<Result[]>() {
            @Override
            public Result[] doInTable(HTableInterface table) throws Throwable {
                List<Get> list = new ArrayList<Get>();
                for (String rowKey : rowKeys) {
                    Get get = new Get(Bytes.toBytes(rowKey));
                    if (!StringUtils.isEmpty(basicFl)) {
                        String[] basicArr = basicFl.split(",");
                        for (String basicStr : basicArr) {
                            get.addColumn(Bytes.toBytes("basic"), Bytes.toBytes(basicStr));
                        }
        return hbaseTemplate.execute(tableName, (table)-> {
            List<Get> list = new ArrayList<>();
            for (String rowKey : rowKeys) {
                Get get = new Get(Bytes.toBytes(rowKey));
                if (!StringUtils.isEmpty(basicFl)) {
                    String[] basicArr = basicFl.split(",");
                    for (String basicStr : basicArr) {
                        get.addColumn(Bytes.toBytes("basic"), Bytes.toBytes(basicStr));
                    }
                    if (!StringUtils.isEmpty(dFl)) {
                        String[] dArr = dFl.split(",");
                        for (String dStr : dArr) {
                            get.addColumn(Bytes.toBytes("d"), Bytes.toBytes(dStr));
                        }
                }
                if (!StringUtils.isEmpty(dFl)) {
                    String[] dArr = dFl.split(",");
                    for (String dStr : dArr) {
                        get.addColumn(Bytes.toBytes("d"), Bytes.toBytes(dStr));
                    }
                    list.add(get);
                }
                return table.get(list);
                list.add(get);
            }
            return table.get(list);
        });
    }
@ -346,14 +308,11 @@ public class HBaseDao extends AbstractHBaseClient {
     * 保存数据 原型模式
     */
    public void save(String tableName, TableBundle tableBundle) {
        hbaseTemplate.execute(tableName, new TableCallback<Void>() {
            @Override
            public Void doInTable(HTableInterface table) throws Throwable {
                List<Put> puts = tableBundle.putOperations();
                Object[] results = new Object[puts.size()];
                table.batch(puts, results);
                return null;
            }
        hbaseTemplate.execute(tableName, (table)-> {
            List<Put> puts = tableBundle.putOperations();
            Object[] results = new Object[puts.size()];
            table.batch(puts, results);
            return null;
        });
    }
@ -361,14 +320,11 @@ public class HBaseDao extends AbstractHBaseClient {
     * 删除数据 原型模式
     */
    public void delete(String tableName, TableBundle tableBundle) {
        hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
            @Override
            public Object[] doInTable(HTableInterface table) throws Throwable {
                List<Delete> deletes = tableBundle.deleteOperations();
                Object[] results = new Object[deletes.size()];
                table.batch(deletes, results);
                return null;
            }
        hbaseTemplate.execute(tableName, (table)-> {
            List<Delete> deletes = tableBundle.deleteOperations();
            Object[] results = new Object[deletes.size()];
            table.batch(deletes, results);
            return null;
        });
    }
@ -376,17 +332,14 @@ public class HBaseDao extends AbstractHBaseClient {
     * 查询数据 原型模式
     */
    public Object[] get(String tableName, TableBundle tableBundle) {
        return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
            @Override
            public Object[] doInTable(HTableInterface table) throws Throwable {
                List<Get> gets = tableBundle.getOperations();
                Object[] results = new Object[gets.size()];
                table.batch(gets, results);
                if (results.length > 0 && results[0].toString().equals("keyvalues=NONE")) {
                    return null;
                }
                return results;
        return hbaseTemplate.execute(tableName, (table)-> {
            List<Get> gets = tableBundle.getOperations();
            Object[] results = new Object[gets.size()];
            table.batch(gets, results);
            if (results.length > 0 && results[0].toString().equals("keyvalues=NONE")) {
                return null;
            }
            return results;
        });
    }
}

+ 33 - 22
mysql-starter/src/main/java/com/yihu/mysql/query/URLQueryParser.java

@ -8,6 +8,8 @@ import javax.persistence.EntityManager;
import javax.persistence.criteria.*;
import java.text.ParseException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
 * URL 查询串解析器
@ -18,6 +20,8 @@ import java.util.*;
 */
public class URLQueryParser<T> {
    private static final Pattern PATTERN = Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}");
    private String fields;
    private String filters;
    private String orders;
@ -89,16 +93,16 @@ public class URLQueryParser<T> {
     * @param root
     */
    private void makeSelection(CriteriaBuilder criteriaBuilder, CriteriaQuery query, Root<T> root) {
        if (false/*StringUtils.isNotEmpty(fields)*/) {
        query.select(root);
        /*if (!StringUtils.isEmpty(fields)) {
            String[] fieldArray = fields.split(",");
            List<Selection<T>> selections = new ArrayList<>(fieldArray.length);
            Arrays.stream(fieldArray).forEach(elem -> selections.add(root.get(elem)));
            query.select(criteriaBuilder.tuple(selections.toArray(new Selection[selections.size()])));
        } else {
            query.select(root);
        }
        }*/
    }
    /**
@ -141,8 +145,9 @@ public class URLQueryParser<T> {
     * @param root
     */
    private void makeWhere(CriteriaBuilder criteriaBuilder, CriteriaQuery query, Root<T> root) throws ParseException {
         if (StringUtils.isEmpty(filters)) return;
         if (StringUtils.isEmpty(filters)) {
             return;
         }
        Map<String, Predicate> predicateMap = new HashMap<>();
        String[] filterArray = filters.split(";");
@ -152,36 +157,42 @@ public class URLQueryParser<T> {
            String filter = filterArray[i];
            //查看是否是时间格式 yyyy-MM-dd hh:mm:ss
            String[] tokens;
//            Pattern p = Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}");
//            String[] filters = filter.split("[?]|<>|>=|>|<=|<|=");
//            Matcher m = p.matcher(filters[1]);
//            if (m.matches()) {
//                tokens = new String[]{filter};
//            }else {
//                tokens = filter.split(" ");
//            }
            tokens = filter.split(" ");
            String[] filters = filter.split("[?]|<>|>=|>|<=|<|=");
            Matcher m = PATTERN.matcher(filters[1]);
            if (m.matches()) {
                tokens = new String[]{filter};
            } else {
                tokens = filter.split(" ");
            }
            if (tokens.length > 2){
                for(int j=1; j<tokens.length; j++){
                    if(j==tokens.length-1)
                for (int j = 1; j < tokens.length; j++){
                    if (j == tokens.length - 1) {
                        tokens[1] = tokens[j];
                    else
                        tokens[0] += " " +tokens[j] ;
                    } else {
                        tokens[0] += " " + tokens[j] ;
                    }
                }
            }
            String group = null;
            if (tokens.length >= 2) group = tokens[1];
            if (tokens.length >= 2) {
                group = tokens[1];
            }
            Predicate predicate = splitFilter(tokens[0], criteriaBuilder, root);
            if (group != null) {
                if (predicateMap.get(group) == null)
                if (null == predicateMap.get(group)) {
                    predicateMap.put(group, predicate);
                else
                } else {
                    predicateMap.put(group, criteriaBuilder.or(predicateMap.get(group), predicate));
            } else
                }
            } else {
                predicateMap.put(Integer.toString(i), predicate);
            }
        }
        query.where(predicateMap.values().toArray(new Predicate[predicateMap.size()]));