package com.yihu.ehr.hbase; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.hadoop.hbase.RowMapper; import org.springframework.data.hadoop.hbase.TableCallback; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.*; /** * 数据增删改查 */ @Service public class HBaseDao extends AbstractHBaseClient { @Autowired ObjectMapper objectMapper; /** *模糊匹配rowkey */ public String[] findRowKeys(String tableName, String rowkeyRegEx) throws Exception { Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("basic")); scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkeyRegEx))); List list = new LinkedList<>(); hbaseTemplate.find(tableName, scan, new RowMapper() { @Override public Void mapRow(Result result, int rowNum) throws Exception { list.add(Bytes.toString(result.getRow())); return null; } }); return list.toArray(new String[list.size()]); } /** *表总条数 */ public Integer count(String tableName) throws Exception { Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("basic")); scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^"))); List list = new LinkedList<>(); hbaseTemplate.find(tableName, scan, new RowMapper() { @Override public Void mapRow(Result result, int rowNum) throws Exception { list.add(Bytes.toString(result.getRow())); return null; } }); return list.size(); } /** * 根据 rowkey获取一条记录 */ public String get(String tableName, String rowkey) { return hbaseTemplate.get(tableName, rowkey,new RowMapper() { public String mapRow(Result result, int rowNum) throws Exception { if(!result.isEmpty()) { List ceList = result.listCells(); Map map = new HashMap(); map.put("rowkey",rowkey); if (ceList != null && ceList.size() > 0) { for (Cell cell : ceList) { //默认不加列族 // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_" map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } return objectMapper.writeValueAsString(map); } else{ return ""; } } }); } /** * 通过表名 key 和 列族 和列 获取一个数据 */ public String get(String tableName ,String rowkey, String familyName, String qualifier) { return hbaseTemplate.get(tableName, rowkey,familyName,qualifier ,new RowMapper(){ public String mapRow(Result result, int rowNum) throws Exception { List ceList = result.listCells(); String res = ""; if(ceList!=null&&ceList.size()>0){ for(Cell cell:ceList){ res = Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); } } return res; } }); } /** * 通过rowkey获取某行数据 */ public Result getResult(String tableName, String rowKey) throws Exception { return hbaseTemplate.get(tableName, rowKey, new RowMapper() { public Result mapRow(Result result, int rowNum) throws Exception { return result; } }); } /** * 通过rowkey获取多行数据 */ public Result[] getResultList(String tableName,List rowKeys) throws Exception { return hbaseTemplate.execute(tableName, new TableCallback() { public Result[] doInTable(HTableInterface table) throws Throwable { List list = new ArrayList(); for (String rowKey : rowKeys) { Get get = new Get(Bytes.toBytes(rowKey)); list.add(get); } return table.get(list); } }); } /** * 通过rowkey获取某行数据 */ public Map getResultMap(String tableName, String rowKey) throws Exception { return hbaseTemplate.get(tableName, rowKey, new RowMapper>() { public Map mapRow(Result result, int rowNum) throws Exception { Map map = null; if(result!=null) { List ceList = result.listCells(); if (ceList != null && ceList.size() > 0) { map = new HashMap(); map.put("rowkey", rowKey); for (Cell cell : ceList) { //默认不加列族 // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_" map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } } return map; } }); } /** * 修改某行某列值 */ public void put(String tableName ,String rowkey, String familyName, String qualifier,String value) throws Exception { hbaseTemplate.execute(tableName, new TableCallback() { public String doInTable(HTableInterface table) throws Throwable { Put p = new Put(rowkey.getBytes()); p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes()); table.put(p); return null; } }); } /** * 新增行 */ public void add(String tableName , String rowkey, Map> family) throws Exception { hbaseTemplate.execute(tableName, new TableCallback() { public String doInTable(HTableInterface table) throws Throwable { Put p = new Put(rowkey.getBytes()); for(String familyName : family.keySet()) { Map map = family.get(familyName); for (String qualifier : map.keySet()) { String value = map.get(qualifier); p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes()); } } table.put(p); return null; } }); } /** * 新增数据 */ public void add(String tableName, String rowKey, String family, Object[] columns, Object[] values) throws Exception { hbaseTemplate.execute(tableName, new TableCallback() { public Object doInTable(HTableInterface htable) throws Throwable { Put put = new Put(Bytes.toBytes(rowKey)); for (int j = 0; j < columns.length; j++) { //为空字段不保存 if(values[j]!=null) { String column = String.valueOf(columns[j]); String value = String.valueOf(values[j]); put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value)); } } htable.put(put); return null; } }); } /** * 根据 rowkey删除一条记录 */ public void delete(String tableName, String rowkey) { hbaseTemplate.execute(tableName, new TableCallback() { public String doInTable(HTableInterface table) throws Throwable { Delete d = new Delete(rowkey.getBytes()); table.delete(d); return null; } }); } /** * 批量删除数据 */ public Object[] deleteBatch(String tableName, String[] rowKeys) throws Exception { return hbaseTemplate.execute(tableName, new TableCallback() { public Object[] doInTable(HTableInterface table) throws Throwable { List deletes = new ArrayList<>(rowKeys.length); for (String rowKey : rowKeys) { Delete delete = new Delete(Bytes.toBytes(rowKey)); deletes.add(delete); } Object[] results = new Object[deletes.size()]; table.batch(deletes, results); return results; } }); } /** * 删除列族 */ public void deleteFamily(String tableName, String rowKey, String familyName) throws Exception { hbaseTemplate.delete(tableName, rowKey, familyName); } /** * 删除某列 */ public void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws Exception { hbaseTemplate.delete(tableName, rowKey, familyName, columnName); } /************************************* Bean使用原型模式 ***************************************************************/ /** * 保存数据 原型模式 */ public void save(String tableName, TableBundle tableBundle) throws Exception { hbaseTemplate.execute(tableName, new TableCallback() { public Object doInTable(HTableInterface htable) throws Throwable { List puts = tableBundle.putOperations(); Object[] results = new Object[puts.size()]; htable.batch(puts, results); return null; } }); } /** * 查询数据 原型模式 */ public Object[] get(String tableName, TableBundle tableBundle) { return hbaseTemplate.execute(tableName, new TableCallback() { public Object[] doInTable(HTableInterface table) throws Throwable { List gets = tableBundle.getOperations(); Object[] results = new Object[gets.size()]; table.batch(gets, results); if (results.length > 0 && results[0].toString().equals("keyvalues=NONE")) return null; return results; } }); } /** * 删除数据 原型模式 */ public void delete(String tableName, TableBundle tableBundle) { hbaseTemplate.execute(tableName, new TableCallback() { public Object[] doInTable(HTableInterface table) throws Throwable { List deletes = tableBundle.deleteOperations(); Object[] results = new Object[deletes.size()]; table.batch(deletes, results); return null; } }); } }