Browse Source

优化调整 当字段为细表数据时,需要指定主表 ID prifie_id

jkzlzhoujie 6 years ago
parent
commit
5cbdc29d49
1 changed files with 40 additions and 2 deletions
  1. 40 2
      src/main/java/com/yihu/ehr/listener/HbaseLogListener.java

+ 40 - 2
src/main/java/com/yihu/ehr/listener/HbaseLogListener.java

@ -4,12 +4,17 @@ import com.google.gson.Gson;
import com.ngdata.sep.EventListener;
import com.ngdata.sep.SepEvent;
import com.yihu.ehr.kafka.Producer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -23,6 +28,8 @@ public class HbaseLogListener implements EventListener {
    private static String action_del = "DeleteColumn";//删除单个字段值
    private static String action_delFamily = "DeleteFamily";//删除整行
    private static String hbase = "Hbase";
//    @Autowired
//    private HBaseDao hBaseDao;
    @Override
    public void processEvents(List<SepEvent> sepEvents) {
@ -33,9 +40,22 @@ public class HbaseLogListener implements EventListener {
            log.debug("  table = " + Bytes.toString(sepEvent.getTable()));
            log.debug("  row = " + Bytes.toString(sepEvent.getRow()));
            Map<String, Object> dataMap = new HashMap<String, Object>();
            String table = Bytes.toString(sepEvent.getTable());
            String rowKey = Bytes.toString(sepEvent.getRow());
            dataMap.put("dataSource",hbase);
            dataMap.put("table",Bytes.toString(sepEvent.getTable()));
            dataMap.put("rowKey",Bytes.toString(sepEvent.getRow()));
            dataMap.put("table",table);
            dataMap.put("rowKey",rowKey);
            if(table.equals("HealtharchiveSub")){
                //当字段未细表时,需要指定主表ID
                try {
                    Map<String,Object> map =  queryDataByRowKey(table,rowKey);
                    if(map.get("profile_id") != null){
                        dataMap.put("profile_id",map.get("profile_id"));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            for (Cell cell : sepEvent.getKeyValues()) {
                String cloumn = Bytes.toString(CellUtil.cloneQualifier(cell));
                String value = Bytes.toString(CellUtil.cloneValue(cell));
@ -57,4 +77,22 @@ public class HbaseLogListener implements EventListener {
        }
        log.info("sep Events end");
    }
    public Map<String,Object> queryDataByRowKey(String tableNameString,String rowkey) throws IOException {
        Map<String,Object> map = new HashMap<>();
        Configuration hbaseConf = HBaseConfiguration.create();
        hbaseConf.setBoolean("hbase.replication", true);
        Connection connection = ConnectionFactory.createConnection(hbaseConf);
        Table table = connection.getTable(TableName.valueOf(tableNameString));
        Get get = new Get(rowkey.getBytes());
        //按行查询数据
        Result result = table.get(get);
        List<Cell> listCells = result.listCells();
        for (Cell cell : listCells) {
            String row = Bytes.toString(CellUtil.cloneRow(cell));
            String val = Bytes.toString(CellUtil.cloneValue(cell));
            map.put(row,val);
        }
        return map;
    }
}