HBaseDao.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package com.yihu.ehr.hbase;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.hadoop.hbase.Cell;
  4. import org.apache.hadoop.hbase.client.*;
  5. import org.apache.hadoop.hbase.filter.CompareFilter;
  6. import org.apache.hadoop.hbase.filter.RegexStringComparator;
  7. import org.apache.hadoop.hbase.filter.RowFilter;
  8. import org.apache.hadoop.hbase.util.Bytes;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.data.hadoop.hbase.RowMapper;
  11. import org.springframework.data.hadoop.hbase.TableCallback;
  12. import org.springframework.stereotype.Service;
  13. import java.io.IOException;
  14. import java.util.*;
  15. /**
  16. * 数据增删改查
  17. */
  18. @Service
  19. public class HBaseDao extends AbstractHBaseClient {
  20. @Autowired
  21. ObjectMapper objectMapper;
  22. /**
  23. *模糊匹配rowkey
  24. */
  25. public String[] findRowKeys(String tableName, String rowkeyRegEx) throws Exception {
  26. Scan scan = new Scan();
  27. scan.addFamily(Bytes.toBytes("basic"));
  28. scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkeyRegEx)));
  29. List<String> list = new LinkedList<>();
  30. hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
  31. @Override
  32. public Void mapRow(Result result, int rowNum) throws Exception {
  33. list.add(Bytes.toString(result.getRow()));
  34. return null;
  35. }
  36. });
  37. return list.toArray(new String[list.size()]);
  38. }
  39. /**
  40. *表总条数
  41. */
  42. public Integer count(String tableName) throws Exception {
  43. Scan scan = new Scan();
  44. scan.addFamily(Bytes.toBytes("basic"));
  45. scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^")));
  46. List<String> list = new LinkedList<>();
  47. hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
  48. @Override
  49. public Void mapRow(Result result, int rowNum) throws Exception {
  50. list.add(Bytes.toString(result.getRow()));
  51. return null;
  52. }
  53. });
  54. return list.size();
  55. }
  56. /**
  57. * 根据 rowkey获取一条记录
  58. */
  59. public String get(String tableName, String rowkey) {
  60. return hbaseTemplate.get(tableName, rowkey,new RowMapper<String>() {
  61. public String mapRow(Result result, int rowNum) throws Exception {
  62. if(!result.isEmpty())
  63. {
  64. List<Cell> ceList = result.listCells();
  65. Map<String, Object> map = new HashMap<String, Object>();
  66. map.put("rowkey",rowkey);
  67. if (ceList != null && ceList.size() > 0) {
  68. for (Cell cell : ceList) {
  69. //默认不加列族
  70. // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
  71. map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
  72. Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
  73. }
  74. }
  75. return objectMapper.writeValueAsString(map);
  76. }
  77. else{
  78. return "";
  79. }
  80. }
  81. });
  82. }
  83. /**
  84. * 通过表名 key 和 列族 和列 获取一个数据
  85. */
  86. public String get(String tableName ,String rowkey, String familyName, String qualifier) {
  87. return hbaseTemplate.get(tableName, rowkey,familyName,qualifier ,new RowMapper<String>(){
  88. public String mapRow(Result result, int rowNum) throws Exception {
  89. List<Cell> ceList = result.listCells();
  90. String res = "";
  91. if(ceList!=null&&ceList.size()>0){
  92. for(Cell cell:ceList){
  93. res = Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  94. }
  95. }
  96. return res;
  97. }
  98. });
  99. }
  100. /**
  101. * 通过rowkey获取某行数据
  102. */
  103. public Result getResult(String tableName, String rowKey) throws Exception {
  104. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Result>() {
  105. public Result mapRow(Result result, int rowNum) throws Exception {
  106. return result;
  107. }
  108. });
  109. }
  110. /**
  111. * 通过rowkey获取多行数据
  112. */
  113. public Result[] getResultList(String tableName,List<String> rowKeys) throws Exception {
  114. return hbaseTemplate.execute(tableName, new TableCallback<Result[]>() {
  115. public Result[] doInTable(HTableInterface table) throws Throwable {
  116. List<Get> list = new ArrayList<Get>();
  117. for (String rowKey : rowKeys) {
  118. Get get = new Get(Bytes.toBytes(rowKey));
  119. list.add(get);
  120. }
  121. return table.get(list);
  122. }
  123. });
  124. }
  125. /**
  126. * 通过rowkey获取某行数据
  127. */
  128. public Map<String, Object> getResultMap(String tableName, String rowKey) throws Exception {
  129. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
  130. public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
  131. Map<String, Object> map = null;
  132. if(result!=null) {
  133. List<Cell> ceList = result.listCells();
  134. if (ceList != null && ceList.size() > 0) {
  135. map = new HashMap<String, Object>();
  136. map.put("rowkey", rowKey);
  137. for (Cell cell : ceList) {
  138. //默认不加列族
  139. // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
  140. map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
  141. Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
  142. }
  143. }
  144. }
  145. return map;
  146. }
  147. });
  148. }
  149. /**
  150. * 修改某行某列值
  151. */
  152. public void put(String tableName ,String rowkey, String familyName, String qualifier,String value) throws Exception
  153. {
  154. hbaseTemplate.execute(tableName, new TableCallback<String>() {
  155. public String doInTable(HTableInterface table) throws Throwable {
  156. Put p = new Put(rowkey.getBytes());
  157. p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
  158. table.put(p);
  159. return null;
  160. }
  161. });
  162. }
  163. /**
  164. * 新增行
  165. */
  166. public void add(String tableName , String rowkey, Map<String,Map<String,String>> family) throws Exception
  167. {
  168. hbaseTemplate.execute(tableName, new TableCallback<String>() {
  169. public String doInTable(HTableInterface table) throws Throwable {
  170. Put p = new Put(rowkey.getBytes());
  171. for(String familyName : family.keySet())
  172. {
  173. Map<String,String> map = family.get(familyName);
  174. for (String qualifier : map.keySet())
  175. {
  176. String value = map.get(qualifier);
  177. p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
  178. }
  179. }
  180. table.put(p);
  181. return null;
  182. }
  183. });
  184. }
  185. /**
  186. * 新增数据
  187. */
  188. public void add(String tableName, String rowKey, String family, Object[] columns, Object[] values) throws Exception {
  189. hbaseTemplate.execute(tableName, new TableCallback<Object>() {
  190. public Object doInTable(HTableInterface htable) throws Throwable {
  191. Put put = new Put(Bytes.toBytes(rowKey));
  192. for (int j = 0; j < columns.length; j++) {
  193. //为空字段不保存
  194. if(values[j]!=null)
  195. {
  196. String column = String.valueOf(columns[j]);
  197. String value = String.valueOf(values[j]);
  198. put.addColumn(Bytes.toBytes(family),
  199. Bytes.toBytes(column),
  200. Bytes.toBytes(value));
  201. }
  202. }
  203. htable.put(put);
  204. return null;
  205. }
  206. });
  207. }
  208. /**
  209. * 根据 rowkey删除一条记录
  210. */
  211. public void delete(String tableName, String rowkey) {
  212. hbaseTemplate.execute(tableName, new TableCallback<String>() {
  213. public String doInTable(HTableInterface table) throws Throwable {
  214. Delete d = new Delete(rowkey.getBytes());
  215. table.delete(d);
  216. return null;
  217. }
  218. });
  219. }
  220. /**
  221. * 批量删除数据
  222. */
  223. public Object[] deleteBatch(String tableName, String[] rowKeys) throws Exception {
  224. return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  225. public Object[] doInTable(HTableInterface table) throws Throwable {
  226. List<Delete> deletes = new ArrayList<>(rowKeys.length);
  227. for (String rowKey : rowKeys) {
  228. Delete delete = new Delete(Bytes.toBytes(rowKey));
  229. deletes.add(delete);
  230. }
  231. Object[] results = new Object[deletes.size()];
  232. table.batch(deletes, results);
  233. return results;
  234. }
  235. });
  236. }
  237. /**
  238. * 删除列族
  239. */
  240. public void deleteFamily(String tableName, String rowKey, String familyName) throws Exception {
  241. hbaseTemplate.delete(tableName, rowKey, familyName);
  242. }
  243. /**
  244. * 删除某列
  245. */
  246. public void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws Exception {
  247. hbaseTemplate.delete(tableName, rowKey, familyName, columnName);
  248. }
  249. /************************************* Bean使用原型模式 ***************************************************************/
  250. /**
  251. * 保存数据 原型模式
  252. */
  253. public void save(String tableName, TableBundle tableBundle) throws Exception {
  254. hbaseTemplate.execute(tableName, new TableCallback<Object>() {
  255. public Object doInTable(HTableInterface htable) throws Throwable {
  256. List<Put> puts = tableBundle.putOperations();
  257. Object[] results = new Object[puts.size()];
  258. htable.batch(puts, results);
  259. return null;
  260. }
  261. });
  262. }
  263. /**
  264. * 查询数据 原型模式
  265. */
  266. public Object[] get(String tableName, TableBundle tableBundle) {
  267. return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  268. public Object[] doInTable(HTableInterface table) throws Throwable {
  269. List<Get> gets = tableBundle.getOperations();
  270. Object[] results = new Object[gets.size()];
  271. table.batch(gets, results);
  272. if (results.length > 0 && results[0].toString().equals("keyvalues=NONE")) return null;
  273. return results;
  274. }
  275. });
  276. }
  277. /**
  278. * 删除数据 原型模式
  279. */
  280. public void delete(String tableName, TableBundle tableBundle) {
  281. hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  282. public Object[] doInTable(HTableInterface table) throws Throwable {
  283. List<Delete> deletes = tableBundle.deleteOperations();
  284. Object[] results = new Object[deletes.size()];
  285. table.batch(deletes, results);
  286. return null;
  287. }
  288. });
  289. }
  290. }