HBaseHelper.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package com.yihu.base.hbase;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.hadoop.hbase.Cell;
  4. import org.apache.hadoop.hbase.client.*;
  5. import org.apache.hadoop.hbase.filter.CompareFilter;
  6. import org.apache.hadoop.hbase.filter.RegexStringComparator;
  7. import org.apache.hadoop.hbase.filter.RowFilter;
  8. import org.apache.hadoop.hbase.util.Bytes;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.data.hadoop.hbase.RowMapper;
  11. import org.springframework.data.hadoop.hbase.TableCallback;
  12. import org.springframework.stereotype.Service;
  13. import java.io.IOException;
  14. import java.util.*;
  15. /**
  16. * 数据增删改查
  17. */
  18. @Service
  19. public class HBaseHelper extends AbstractHBaseClient {
  20. @Autowired
  21. ObjectMapper objectMapper;
  22. /**
  23. *模糊匹配rowkey
  24. */
  25. public String[] findRowKeys(String tableName, String rowkeyRegEx) throws Exception {
  26. Scan scan = new Scan();
  27. scan.addFamily(Bytes.toBytes("basic"));
  28. scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(rowkeyRegEx)));
  29. List<String> list = new LinkedList<>();
  30. hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
  31. @Override
  32. public Void mapRow(Result result, int rowNum) throws Exception {
  33. list.add(Bytes.toString(result.getRow()));
  34. return null;
  35. }
  36. });
  37. return list.toArray(new String[list.size()]);
  38. }
  39. /**
  40. *表总条数
  41. */
  42. public Integer count(String tableName) throws Exception {
  43. Scan scan = new Scan();
  44. scan.addFamily(Bytes.toBytes("basic"));
  45. scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^")));
  46. List<String> list = new LinkedList<>();
  47. hbaseTemplate.find(tableName, scan, new RowMapper<Void>() {
  48. @Override
  49. public Void mapRow(Result result, int rowNum) throws Exception {
  50. list.add(Bytes.toString(result.getRow()));
  51. return null;
  52. }
  53. });
  54. return list.size();
  55. }
  56. /**
  57. * 根据 rowkey获取一条记录
  58. */
  59. public String get(String tableName, String rowkey) {
  60. return hbaseTemplate.get(tableName, rowkey,new RowMapper<String>() {
  61. public String mapRow(Result result, int rowNum) throws Exception {
  62. if(!result.isEmpty())
  63. {
  64. List<Cell> ceList = result.listCells();
  65. Map<String, Object> map = new HashMap<String, Object>();
  66. map.put("rowkey",rowkey);
  67. if (ceList != null && ceList.size() > 0) {
  68. for (Cell cell : ceList) {
  69. //默认不加列族
  70. // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
  71. map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
  72. Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
  73. }
  74. }
  75. return objectMapper.writeValueAsString(map);
  76. }
  77. else{
  78. return "";
  79. }
  80. }
  81. });
  82. }
  83. /**
  84. * 通过表名 key 和 列族 和列 获取一个数据
  85. */
  86. public String get(String tableName ,String rowkey, String familyName, String qualifier) {
  87. return hbaseTemplate.get(tableName, rowkey,familyName,qualifier ,new RowMapper<String>(){
  88. public String mapRow(Result result, int rowNum) throws Exception {
  89. List<Cell> ceList = result.listCells();
  90. String res = "";
  91. if(ceList!=null&&ceList.size()>0){
  92. for(Cell cell:ceList){
  93. res = Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  94. }
  95. }
  96. return res;
  97. }
  98. });
  99. }
  100. /**
  101. * 通过rowkey获取某行数据
  102. */
  103. public Result getResult(String tableName, String rowKey) throws Exception {
  104. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Result>() {
  105. public Result mapRow(Result result, int rowNum) throws Exception {
  106. return result;
  107. }
  108. });
  109. }
  110. /**
  111. * 通过rowkey获取多行数据
  112. */
  113. public Result[] getResultList(String tableName,List<String> rowKeys) throws Exception {
  114. return hbaseTemplate.execute(tableName, new TableCallback<Result[]>() {
  115. public Result[] doInTable(HTableInterface table) throws Throwable {
  116. List<Get> list = new ArrayList<Get>();
  117. for (String rowKey : rowKeys) {
  118. Get get = new Get(Bytes.toBytes(rowKey));
  119. list.add(get);
  120. }
  121. return table.get(list);
  122. }
  123. });
  124. }
  125. /**
  126. * 通过rowkey获取某行数据
  127. */
  128. public Map<String, Object> getResultMap(String tableName, String rowKey) throws Exception {
  129. return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
  130. public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
  131. Map<String, Object> map = null;
  132. if(result!=null) {
  133. List<Cell> ceList = result.listCells();
  134. if (ceList != null && ceList.size() > 0) {
  135. map = new HashMap<String, Object>();
  136. map.put("rowkey", rowKey);
  137. for (Cell cell : ceList) {
  138. //默认不加列族
  139. // Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) +"_"
  140. map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
  141. Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
  142. }
  143. }
  144. }
  145. return map;
  146. }
  147. });
  148. }
  149. /**
  150. * 修改某行某列值
  151. */
  152. public void put(String tableName ,String rowkey, String familyName, String qualifier,String value) throws Exception
  153. {
  154. hbaseTemplate.execute(tableName, new TableCallback<String>() {
  155. public String doInTable(HTableInterface table) throws Throwable {
  156. Put p = new Put(rowkey.getBytes());
  157. p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
  158. table.put(p);
  159. return null;
  160. }
  161. });
  162. }
  163. /**
  164. * 新增行
  165. */
  166. public void add(String tableName , String rowkey, Map<String,Map<String,String>> family) throws Exception
  167. {
  168. hbaseTemplate.execute(tableName, new TableCallback<String>() {
  169. public String doInTable(HTableInterface table) throws Throwable {
  170. Put p = new Put(rowkey.getBytes());
  171. for(String familyName : family.keySet())
  172. {
  173. Map<String,String> map = family.get(familyName);
  174. for (String qualifier : map.keySet())
  175. {
  176. String value = map.get(qualifier);
  177. if(value == null){
  178. continue;
  179. }
  180. p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
  181. }
  182. }
  183. table.put(p);
  184. return null;
  185. }
  186. });
  187. }
  188. /**
  189. * 新增数据
  190. */
  191. public void add(String tableName, String rowKey, String family, Object[] columns, Object[] values) throws Exception {
  192. hbaseTemplate.execute(tableName, new TableCallback<Object>() {
  193. public Object doInTable(HTableInterface htable) throws Throwable {
  194. Put put = new Put(Bytes.toBytes(rowKey));
  195. for (int j = 0; j < columns.length; j++) {
  196. //为空字段不保存
  197. if(values[j]!=null)
  198. {
  199. String column = String.valueOf(columns[j]);
  200. String value = String.valueOf(values[j]);
  201. put.addColumn(Bytes.toBytes(family),
  202. Bytes.toBytes(column),
  203. Bytes.toBytes(value));
  204. }
  205. }
  206. htable.put(put);
  207. return null;
  208. }
  209. });
  210. }
  211. /**
  212. * 根据 rowkey删除一条记录
  213. */
  214. public void delete(String tableName, String rowkey) {
  215. hbaseTemplate.execute(tableName, new TableCallback<String>() {
  216. public String doInTable(HTableInterface table) throws Throwable {
  217. Delete d = new Delete(rowkey.getBytes());
  218. table.delete(d);
  219. return null;
  220. }
  221. });
  222. }
  223. /**
  224. * 批量删除数据
  225. */
  226. public Object[] deleteBatch(String tableName, String[] rowKeys) throws Exception {
  227. return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  228. public Object[] doInTable(HTableInterface table) throws Throwable {
  229. List<Delete> deletes = new ArrayList<>(rowKeys.length);
  230. for (String rowKey : rowKeys) {
  231. Delete delete = new Delete(Bytes.toBytes(rowKey));
  232. deletes.add(delete);
  233. }
  234. Object[] results = new Object[deletes.size()];
  235. table.batch(deletes, results);
  236. return results;
  237. }
  238. });
  239. }
  240. /**
  241. * 删除列族
  242. */
  243. public void deleteFamily(String tableName, String rowKey, String familyName) throws Exception {
  244. hbaseTemplate.delete(tableName, rowKey, familyName);
  245. }
  246. /**
  247. * 删除某列
  248. */
  249. public void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws Exception {
  250. hbaseTemplate.delete(tableName, rowKey, familyName, columnName);
  251. }
  252. /************************************* Bean使用原型模式 ***************************************************************/
  253. /**
  254. * 保存数据 原型模式
  255. */
  256. public void save(String tableName, TableBundle tableBundle) throws Exception {
  257. hbaseTemplate.execute(tableName, new TableCallback<Object>() {
  258. public Object doInTable(HTableInterface htable) throws Throwable {
  259. List<Put> puts = tableBundle.putOperations();
  260. Object[] results = new Object[puts.size()];
  261. htable.batch(puts, results);
  262. return null;
  263. }
  264. });
  265. }
  266. /**
  267. * 查询数据 原型模式
  268. */
  269. public Object[] get(String tableName, TableBundle tableBundle) {
  270. return hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  271. public Object[] doInTable(HTableInterface table) throws Throwable {
  272. List<Get> gets = tableBundle.getOperations();
  273. Object[] results = new Object[gets.size()];
  274. table.batch(gets, results);
  275. if (results.length > 0 && results[0].toString().equals("keyvalues=NONE")) return null;
  276. return results;
  277. }
  278. });
  279. }
  280. /**
  281. * 删除数据 原型模式
  282. */
  283. public void delete(String tableName, TableBundle tableBundle) {
  284. hbaseTemplate.execute(tableName, new TableCallback<Object[]>() {
  285. public Object[] doInTable(HTableInterface table) throws Throwable {
  286. List<Delete> deletes = tableBundle.deleteOperations();
  287. Object[] results = new Object[deletes.size()];
  288. table.batch(deletes, results);
  289. return null;
  290. }
  291. });
  292. }
  293. }