HbaseFactory.java 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package com.yihu.base.hbase.config;
  2. import com.yihu.base.hbase.properties.HbaseProperties;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.HColumnDescriptor;
  6. import org.apache.hadoop.hbase.HTableDescriptor;
  7. import org.apache.hadoop.hbase.TableName;
  8. import org.apache.hadoop.hbase.client.*;
  9. import org.springframework.data.hadoop.hbase.HbaseTemplate;
  10. import org.springframework.data.hadoop.hbase.TableCallback;
  11. import org.springframework.util.StringUtils;
  12. import java.io.IOException;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Map;
  16. /**
  17. * Created by chenweida on 2018/2/27.
  18. */
  19. public class HbaseFactory {
  20. private HbaseTemplate hbaseTemplate = new HbaseTemplate();
  21. private HbaseProperties hbaseProperties;
  22. private volatile Configuration configuration;
  23. public HbaseFactory(HbaseProperties hbaseProperties) {
  24. this.hbaseProperties = hbaseProperties;
  25. }
  26. /**
  27. * 批量新增行
  28. */
  29. public void addLogBulk(String tableName, List<String> rowkeyList, List<Map<String, Map<String, String>>> familyList) throws Exception {
  30. hbaseTemplate.execute(tableName, new TableCallback<String>() {
  31. @Override
  32. public String doInTable(HTableInterface table) throws Throwable {
  33. List<Put> list = new ArrayList<>();
  34. for (int i = 0; i < rowkeyList.size(); i++) {
  35. Put p = new Put(rowkeyList.get(i).getBytes());
  36. Map<String, Map<String, String>> family = familyList.get(i);
  37. for (String familyName : family.keySet()) {
  38. Map<String, String> map = family.get(familyName);
  39. for (String qualifier : map.keySet()) {
  40. String value = map.get(qualifier);
  41. if (value == null) {
  42. continue;
  43. }
  44. p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
  45. }
  46. }
  47. list.add(p);
  48. }
  49. table.put(list);
  50. return null;
  51. }
  52. });
  53. }
  54. public void init() {
  55. Connection connection = null;
  56. HBaseAdmin hBaseAdmin = null;
  57. try {
  58. //获取链接
  59. connection = getConnection();
  60. hBaseAdmin = (HBaseAdmin) connection.getAdmin();
  61. //判断表名是否存在
  62. if (!hBaseAdmin.tableExists(hbaseProperties.getTableName())) {
  63. //创建表
  64. createTable(hbaseProperties);
  65. }
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. } finally {
  69. try {
  70. if (hBaseAdmin != null) {
  71. hBaseAdmin.close();
  72. }
  73. } catch (IOException e) {
  74. e.printStackTrace();
  75. }
  76. try {
  77. if (connection != null) {
  78. connection.close();
  79. }
  80. } catch (IOException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }
  85. /**
  86. * 获取链接
  87. *
  88. * @return
  89. * @throws IOException
  90. */
  91. private Connection getConnection() throws IOException {
  92. if (configuration == null) {
  93. synchronized (HbaseFactory.class) {
  94. if (configuration == null) {
  95. //设置hadoop账号
  96. System.setProperty("HADOOP_USER_NAME", hbaseProperties.getHdfsUserName());
  97. configuration = HBaseConfiguration.create();
  98. configuration.set("hbase.zookeeper.quorum", hbaseProperties.getZkHosts());
  99. if(StringUtils.isEmpty(hbaseProperties.getZkZnodeParent())){
  100. hbaseProperties.setZkZnodeParent(HbaseProperties.default_zkZnodeParent);
  101. }
  102. configuration.set("zookeeper.znode.parent",hbaseProperties.getZkZnodeParent());
  103. if(StringUtils.isEmpty(hbaseProperties.getZkPort())){
  104. hbaseProperties.setZkPort(HbaseProperties.default_zkPort);
  105. }
  106. configuration.set("hbase.zookeeper.property.clientPort",hbaseProperties.getZkPort());
  107. hbaseTemplate.setConfiguration(configuration);
  108. }
  109. }
  110. }
  111. return ConnectionFactory.createConnection(configuration);
  112. }
  113. /**
  114. * 创建表
  115. *
  116. * @param hbaseProperties
  117. * @throws Exception
  118. */
  119. private void createTable(HbaseProperties hbaseProperties) throws Exception {
  120. Connection connection = getConnection();
  121. HBaseAdmin hBaseAdmin = (HBaseAdmin) connection.getAdmin();
  122. HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(hbaseProperties.getTableName()));
  123. //最多建议1-3个列族
  124. for (String family : hbaseProperties.getFamilyNames()) {
  125. HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(family);
  126. hColumnDescriptor.setBlockCacheEnabled(true);//开始读内存缓存
  127. hColumnDescriptor.setInMemory(true);//是否加载到内存
  128. hColumnDescriptor.setMaxVersions(1);//版本数1
  129. hTableDescriptor.addFamily(hColumnDescriptor);
  130. }
  131. hBaseAdmin.createTable(hTableDescriptor);
  132. hBaseAdmin.close();
  133. connection.close();
  134. }
  135. }