Browse Source

优化调整

jkzlzhoujie 6 years ago
parent
commit
483af2e5d5

+ 16 - 7
src/main/java/com/yihu/ehr/controller/ConsumerController.java

@ -9,6 +9,7 @@ import com.ngdata.sep.impl.SepModelImpl;
import com.ngdata.sep.util.zookeeper.ZkUtil;
import com.ngdata.sep.util.zookeeper.ZooKeeperItf;
import com.yihu.ehr.listener.HbaseLogListener;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Bytes;
@ -46,12 +47,18 @@ public class ConsumerController {
     * columnQualifier 列限定符
     */
    @RequestMapping(value = "/start", method = RequestMethod.GET)
    public void start(
            @RequestParam(value = "table") String table,
            @RequestParam(value = "columnFamily") String columnFamily,
            @RequestParam(value = "columnQualifier") String columnQualifier
    public String start(
            @RequestParam(value = "table" ,required = true) String table,
            @RequestParam(value = "columnFamily",required = false) String columnFamily,
            @RequestParam(value = "columnQualifier",required = false) String columnQualifier
    ){
        try {
            if(StringUtils.isEmpty(columnFamily)){
                columnFamily = "";
            }
            if(StringUtils.isEmpty(columnFamily)){
                columnQualifier = "";
            }
            Configuration hbaseConf = HBaseConfiguration.create();
            hbaseConf.setBoolean("hbase.replication", true);
            ZooKeeperItf zk = ZkUtil.connect(zookeeperHost, 20000);
@ -64,24 +71,26 @@ public class ConsumerController {
            sepConsumer = new SepConsumer(subscriptionName, 5000, hbaseLogListener, 1, localhost, zk, hbaseConf,payloadExtractor);
            sepConsumer.start();
            log.debug("started");
            System.out.println("Started");
            return "success";
        }catch (Exception e){
            log.debug(e.getMessage());
        }
        return "fail";
    }
    /**
     * 停止消费
     */
    @RequestMapping(value = "/stop", method = RequestMethod.GET)
    public void stop(){
    public String stop(){
        try {
            sepConsumer.stop();
            log.debug("stop");
            System.out.println("Stop");
            return "success";
        }catch (Exception e){
            log.debug(e.getMessage());
        }
        return "fail";
    }
}

+ 3 - 5
src/main/java/com/yihu/ehr/listener/HbaseLogListener.java

@ -28,8 +28,6 @@ public class HbaseLogListener implements EventListener {
    private static String action_del = "DeleteColumn";//删除单个字段值
    private static String action_delFamily = "DeleteFamily";//删除整行
    private static String hbase = "Hbase";
//    @Autowired
//    private HBaseDao hBaseDao;
    @Override
    public void processEvents(List<SepEvent> sepEvents) {
@ -44,13 +42,13 @@ public class HbaseLogListener implements EventListener {
            String rowKey = Bytes.toString(sepEvent.getRow());
            dataMap.put("dataSource",hbase);
            dataMap.put("table",table);
            dataMap.put("rowKey",rowKey);
            dataMap.put("rowkey",rowKey);
            if(table.equals("HealtharchiveSub")){
                //当字段未细表时,需要指定主表ID
                try {
                    Map<String,Object> map =  queryDataByRowKey(table,rowKey);
                    if(map.get("profile_id") != null){
                        dataMap.put("profile_id",map.get("profile_id"));
                    if(map.get("profileId") != null){
                        dataMap.put("profile_id",map.get("profileId"));
                    }
                } catch (IOException e) {
                    e.printStackTrace();

+ 3 - 3
src/main/java/com/yihu/ehr/sep/DemoSchema.java

@ -33,11 +33,11 @@ public class DemoSchema {
    public static void createSchema(Configuration hbaseConf) throws IOException {
        Admin admin = ConnectionFactory.createConnection(hbaseConf).getAdmin();
        if (!admin.tableExists(TableName.valueOf("sep-user-demo"))) {
            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf("sep-user-demo"));
        if (!admin.tableExists(TableName.valueOf("Healtharchive"))) {
            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf("Healtharchive"));
            HColumnDescriptor infoCf = new HColumnDescriptor("info");
            infoCf.setScope(1);
            infoCf.setScope(1);//集群上打开表student的复制特性
            tableDescriptor.addFamily(infoCf);
            admin.createTable(tableDescriptor);

+ 8 - 45
src/main/java/com/yihu/ehr/sep/LoggingConsumer.java

@ -29,20 +29,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
 * A simple consumer that just logs the events.
@ -53,18 +49,9 @@ public class LoggingConsumer {
    private static String localhost = "192.168.131.109";        //localhost
    private static String zookeeperHost = "master";             //zookeeper
    private static String subscriptionName = "logger";
    private static byte[] table = Bytes.toBytes("sep-user-demo");       //hbase
    private static byte[] table = Bytes.toBytes("HealtharchiveSub");//sep-user-demo       //hbase
    private static byte[]  columnFamily = Bytes.toBytes("");        //
    private static byte[]  columnQualifier = Bytes.toBytes("payload");  //
    private static String index = "medical_service_index";
    private static String type = "medical_service";
    private static byte[]  columnQualifier = Bytes.toBytes("");  //
    public static void main(String[] args) throws Exception {
        Logger log = LoggerFactory.getLogger(LoggingConsumer.class);
@ -90,39 +77,14 @@ public class LoggingConsumer {
        }
    }
    private static Producer createProducer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "192.168.131.234:2181");//
        properties.put("serializer.class", StringEncoder.class.getName());
        properties.put("bootstrap.servers", "192.168.131.234:9092");//
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        return producer;
    }
    private static void sendMessage(Producer producer ,String topic,String message){
//            for (int i = 0; i < 100; i++) {
//                ProducerRecord producerRecord = new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i));
//                producer.send(producerRecord);
//            }
        producer.send(new ProducerRecord<Integer, String>(topic, "message: " + message));
        producer.close();
    }
    private static class EventLogger implements EventListener {
        private static String topic = "sep-hbase-data";
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
        Logger log = LoggerFactory.getLogger(LoggingConsumer.class);
        @Override
        public void processEvents(List<SepEvent> sepEvents) {
            log.info(" sep Events get listener");
            Producer producer =  createProducer();
            for (SepEvent sepEvent : sepEvents) {
                System.out.println("Received event:");
                System.out.println("  table = " + Bytes.toString(sepEvent.getTable()));
@ -133,14 +95,15 @@ public class LoggingConsumer {
                for (Cell cell : sepEvent.getKeyValues()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    log.debug("hbase  cloumn = " + key);
                    log.debug("hbase  value = " + value);
                    System.out.println(cell);
                    System.out.println("  key = " + key);
                    System.out.println("  value = " + value);
                    sendMessage(producer,topic,key + ":" + value);
//                    kafkaTemplate.send(topic,"你","好");
                    json.put(key, value);
                }
            }
            log.info("sep Events end");
        }