HBaseHelper.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package com.yihu.base.hbase;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.hadoop.hbase.Cell;
  4. import org.apache.hadoop.hbase.CellUtil;
  5. import org.apache.hadoop.hbase.client.*;
  6. import org.apache.hadoop.hbase.filter.CompareFilter;
  7. import org.apache.hadoop.hbase.filter.RegexStringComparator;
  8. import org.apache.hadoop.hbase.filter.RowFilter;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.springframework.data.hadoop.hbase.HbaseTemplate;
  11. import org.springframework.data.hadoop.hbase.RowMapper;
  12. import org.springframework.data.hadoop.hbase.TableCallback;
  13. import org.springframework.util.StringUtils;
  14. import java.util.*;
  15. /**
  16. * Hbase - DML
  17. * 数据增删改查
  18. */
  19. public class HBaseHelper extends AbstractHBaseClient {
  20. private ObjectMapper objectMapper = new ObjectMapper();
  21. public HBaseHelper(HbaseTemplate hbaseTemplate) {
  22. super(hbaseTemplate);
  23. }
  24. /**
  25. * 新增数据 - 多列族
  26. * @param tableName
  27. * @param rowKey
  28. * @param family
  29. * @throws Exception
  30. */
  31. public void add(String tableName , String rowKey, Map<String, Map<String, String>> family) {
  32. hbaseTemplate.execute(tableName, new TableCallback<Void>() {
  33. @Override
  34. public Void doInTable(HTableInterface table) throws Throwable {
  35. Put p = new Put(rowKey.getBytes());
  36. for (String familyName : family.keySet()) {
  37. Map<String, String> map = family.get(familyName);
  38. for (String qualifier : map.keySet()) {
  39. String value = map.get(qualifier);
  40. p.addColumn(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
  41. }
  42. }
  43. table.put(p);
  44. return null;
  45. }
  46. });
  47. }
  48. /**
  49. * 新增数据 - 单列族
  50. * @param tableName
  51. * @param rowKey
  52. * @param family
  53. * @param columns
  54. * @param values
  55. */
  56. public void add(String tableName, String rowKey, String family, Object[] columns, Object[] values) {
  57. hbaseTemplate.execute(tableName, new TableCallback<Void>() {
  58. @Override
  59. public Void doInTable(HTableInterface table) throws Throwable {
  60. Put put = new Put(Bytes.toBytes(rowKey));
  61. for (int j = 0; j < columns.length; j++) {
  62. //为空字段不保存
  63. if (values[j] != null) {
  64. String column = String.valueOf(columns[j]);
  65. String value = String.valueOf(values[j]);
  66. put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
  67. }
  68. }
  69. table.put(put);
  70. return null;
  71. }
  72. });
  73. }
  74. /**
  75. * 删除记录
  76. * @param tableName
  77. * @param rowKey
  78. */
  79. public void delete(String tableName, String rowKey) {
  80. hbaseTemplate.execute(tableName, new TableCallback<Void>() {
  81. @Override
  82. public Void doInTable(HTableInterface table) throws Throwable {
  83. Delete d = new Delete(rowKey.getBytes());
  84. table.delete(d);
  85. return null;
  86. }
  87. });
  88. }
  89. /**
  90. * 批量删除数据
  91. * @param tableName
  92. * @param rowKeys
  93. * @return
  94. * @throws Exception
  95. */
  96. public Object[] deleteBatch(String tableName, String[] rowKeys) {
  97. return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  98. @Override
  99. public Object[] doInTable(HTableInterface table) throws Throwable {
  100. List<Delete> deletes = new ArrayList<>(rowKeys.length);
  101. for (String rowKey : rowKeys) {
  102. Delete delete = new Delete(Bytes.toBytes(rowKey));
  103. deletes.add(delete);
  104. }
  105. Object[] results = new Object[deletes.size()];
  106. table.batch(deletes, results);
  107. return results;
  108. }
  109. });
  110. }
  111. /**
  112. * 删除列族
  113. * @param tableName
  114. * @param rowKey
  115. * @param familyName
  116. * @throws Exception
  117. */
  118. public void deleteFamily(String tableName, String rowKey, String familyName) throws Exception {
  119. hbaseTemplate.delete(tableName, rowKey, familyName);
  120. }
  121. /**
  122. * 删除某列
  123. * @param tableName
  124. * @param rowKey
  125. * @param familyName
  126. * @param columnName
  127. * @throws Exception
  128. */
  129. public void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws Exception {
  130. hbaseTemplate.delete(tableName, rowKey, familyName, columnName);
  131. }
  132. /**
  133. * 修改某行某列值
  134. */
  135. public void put(String tableName ,String rowKey, String familyName, String qualifier, String value) throws Exception {
  136. hbaseTemplate.put(tableName, rowKey, familyName, qualifier, value.getBytes());
  137. }
  138. /**
  139. * 模糊匹配rowKey
  140. * @param tableName 表名
  141. * @param rowKeyRegEx 表达式
  142. * @return
  143. * @throws Exception
  144. */
  145. public String[] findRowKeys(String tableName, String rowKeyRegEx) throws Exception {
  146. Scan scan = new Scan();
  147. scan.addFamily(Bytes.toBytes("basic"));
  148. scan.setStartRow(rowKeyRegEx.substring(1, rowKeyRegEx.length()).getBytes());
  149. //scan.setStopRow(rowKeyRegEx.substring(1, rowKeyRegEx.length()).getBytes());
  150. scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowKeyRegEx)));
  151. List<String> list = new LinkedList<>();
  152. hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
  153. @Override
  154. public Void mapRow(Result result, int rowNum) throws Exception {
  155. list.add(Bytes.toString(result.getRow()));
  156. return null;
  157. }
  158. });
  159. return list.toArray(new String[list.size()]);
  160. }
  161. /**
  162. * 表总条数
  163. * @param tableName
  164. * @return
  165. * @throws Exception
  166. */
  167. public Integer count(String tableName) throws Exception {
  168. Scan scan = new Scan();
  169. scan.addFamily(Bytes.toBytes("basic"));
  170. scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^")));
  171. List<String> list = new LinkedList<>();
  172. hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
  173. @Override
  174. public Void mapRow(Result result, int rowNum) throws Exception {
  175. list.add(Bytes.toString(result.getRow()));
  176. return null;
  177. }
  178. });
  179. return list.size();
  180. }
  181. /**
  182. * 根据rowKey获取一条记录
  183. * @param tableName
  184. * @param rowKey
  185. * @return 字符串
  186. */
  187. public String get(String tableName, String rowKey) {
  188. return hbaseTemplate.get(tableName, rowKey, new RowMapper<String>() {
  189. @Override
  190. public String mapRow(Result result, int rowNum) throws Exception {
  191. if(!result.isEmpty()) {
  192. List<Cell> ceList = result.listCells();
  193. Map<String, Object> map = new HashMap<String, Object>();
  194. map.put("rowkey", rowKey);
  195. for (Cell cell : ceList) {
  196. // 默认不加列族
  197. // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
  198. map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
  199. Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
  200. }
  201. return objectMapper.writeValueAsString(map);
  202. }
  203. else{
  204. return "";
  205. }
  206. }
  207. });
  208. }
  209. /**
  210. * 通过rowKey获取某行数据
  211. * @param tableName
  212. * @param rowKey
  213. * @return Map
  214. */
  215. public Map<String, Object> getResultMap(String tableName, String rowKey) {
  216. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
  217. @Override
  218. public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
  219. if(!result.isEmpty()) {
  220. List<Cell> ceList = result.listCells();
  221. Map<String, Object> map = new HashMap<String, Object>();
  222. map.put("rowkey", rowKey);
  223. for (Cell cell : ceList) {
  224. //默认不加列族
  225. // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
  226. map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
  227. Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
  228. }
  229. return map;
  230. }else {
  231. return null;
  232. }
  233. }
  234. });
  235. }
  236. /**
  237. * 通过rowKey获取某行数据
  238. * @param tableName
  239. * @param rowKey
  240. * @return
  241. * @throws Exception
  242. */
  243. public Result getResult(String tableName, String rowKey) throws Exception {
  244. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Result>() {
  245. @Override
  246. public Result mapRow(Result result, int rowNum) throws Exception {
  247. return result;
  248. }
  249. });
  250. }
  251. /**
  252. * 通过表名和rowKey获取指定列族下的值
  253. * @param tableName 表名
  254. * @param rowKey rowKey
  255. * @param familyName 列族
  256. * @return
  257. */
  258. public Map<String, String> get(String tableName, String rowKey, String familyName) {
  259. return hbaseTemplate.get(tableName, rowKey, familyName, new RowMapper<Map<String, String>>(){
  260. @Override
  261. public Map<String, String> mapRow(Result result, int rowNum) throws Exception {
  262. Map<String, String> map = new HashMap<>();
  263. NavigableMap<byte[], byte[]> navigableMaps = result.getFamilyMap(familyName.getBytes());
  264. if(null != navigableMaps) {
  265. for (byte[] key : navigableMaps.keySet()) {
  266. String keys = new String(key);
  267. String values = new String(navigableMaps.get(key));
  268. map.put(keys, values);
  269. }
  270. }
  271. return map;
  272. }
  273. });
  274. }
  275. /**
  276. * 通过表名和rowKey获取指定列族下的列名的值
  277. * @param tableName 表名
  278. * @param rowKey rowKey
  279. * @param familyName 列族
  280. * @param qualifier 列名
  281. * @return
  282. */
  283. public String get(String tableName, String rowKey, String familyName, String qualifier) {
  284. return hbaseTemplate.get(tableName, rowKey, familyName, qualifier, new RowMapper<String>(){
  285. @Override
  286. public String mapRow(Result result, int rowNum) throws Exception {
  287. Cell cell = result.getColumnLatestCell(familyName.getBytes(), qualifier.getBytes());
  288. return new String(CellUtil.cloneValue(cell));
  289. }
  290. });
  291. }
  292. /**
  293. * 通过rowKey集合获取指定列名下的多条数据
  294. * @param tableName 表名
  295. * @param rowKeys rowKeys
  296. * @param basicFl basic列族下的列名
  297. * @param dFl d列族下的列名
  298. * @return
  299. * @throws Exception
  300. */
  301. public Result[] getResultList(String tableName, List<String> rowKeys, String basicFl, String dFl) {
  302. return hbaseTemplate.execute(tableName, new TableCallback<Result[]>() {
  303. @Override
  304. public Result[] doInTable(HTableInterface table) throws Throwable {
  305. List<Get> list = new ArrayList<Get>();
  306. for (String rowKey : rowKeys) {
  307. Get get = new Get(Bytes.toBytes(rowKey));
  308. if (!StringUtils.isEmpty(basicFl)) {
  309. String[] basicArr = basicFl.split(",");
  310. for (String basicStr : basicArr) {
  311. get.addColumn(Bytes.toBytes("basic"), Bytes.toBytes(basicStr));
  312. }
  313. }
  314. if (!StringUtils.isEmpty(dFl)) {
  315. String[] dArr = dFl.split(",");
  316. for (String dStr : dArr) {
  317. get.addColumn(Bytes.toBytes("d"), Bytes.toBytes(dStr));
  318. }
  319. }
  320. list.add(get);
  321. }
  322. return table.get(list);
  323. }
  324. });
  325. }
  326. /************************************* Bean使用原型模式 ***************************************************************/
  327. /**
  328. * 保存数据 原型模式
  329. */
  330. public void save(String tableName, TableBundle tableBundle) {
  331. hbaseTemplate.execute(tableName, new TableCallback<Void>() {
  332. @Override
  333. public Void doInTable(HTableInterface table) throws Throwable {
  334. List<Put> puts = tableBundle.putOperations();
  335. Object[] results = new Object[puts.size()];
  336. table.batch(puts, results);
  337. return null;
  338. }
  339. });
  340. }
  341. /**
  342. * 删除数据 原型模式
  343. */
  344. public void delete(String tableName, TableBundle tableBundle) {
  345. hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  346. @Override
  347. public Object[] doInTable(HTableInterface table) throws Throwable {
  348. List<Delete> deletes = tableBundle.deleteOperations();
  349. Object[] results = new Object[deletes.size()];
  350. table.batch(deletes, results);
  351. return null;
  352. }
  353. });
  354. }
  355. /**
  356. * 查询数据 原型模式
  357. */
  358. public Object[] get(String tableName, TableBundle tableBundle) {
  359. return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  360. @Override
  361. public Object[] doInTable(HTableInterface table) throws Throwable {
  362. List<Get> gets = tableBundle.getOperations();
  363. Object[] results = new Object[gets.size()];
  364. table.batch(gets, results);
  365. if (results.length > 0 && results[0].toString().equals("keyvalues=NONE")) {
  366. return null;
  367. }
  368. return results;
  369. }
  370. });
  371. }
  372. }