|
@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.web.bind.annotation.RequestMapping;
|
|
|
import org.springframework.web.bind.annotation.RequestMethod;
|
|
|
import org.springframework.web.bind.annotation.RequestParam;
|
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
|
|
|
/**
|
|
@ -34,16 +35,22 @@ public class ConsumerController {
|
|
|
private String zookeeperHost;
|
|
|
|
|
|
private static String subscriptionName = "logger";
|
|
|
private static byte[] table = Bytes.toBytes("sep-user-demo"); //hbase
|
|
|
private static byte[] columnFamily = Bytes.toBytes(""); //columnFamily
|
|
|
private static byte[] columnQualifier = Bytes.toBytes("payload"); //列限定符
|
|
|
private SepConsumer sepConsumer;
|
|
|
|
|
|
/**
|
|
|
* hbase 执行动作 日志监听
|
|
|
* 开启消费
|
|
|
* param
|
|
|
* table 表
|
|
|
* columnFamily 列族
|
|
|
* columnQualifier 列限定符
|
|
|
*/
|
|
|
@RequestMapping(value = "/start", method = RequestMethod.GET)
|
|
|
public void start(){
|
|
|
public void start(
|
|
|
@RequestParam(value = "table") String table,
|
|
|
@RequestParam(value = "columnFamily") String columnFamily,
|
|
|
@RequestParam(value = "columnQualifier") String columnQualifier
|
|
|
){
|
|
|
try {
|
|
|
Configuration hbaseConf = HBaseConfiguration.create();
|
|
|
hbaseConf.setBoolean("hbase.replication", true);
|
|
@ -52,15 +59,12 @@ public class ConsumerController {
|
|
|
if (!sepModel.hasSubscription(subscriptionName)) {
|
|
|
sepModel.addSubscriptionSilent(subscriptionName);
|
|
|
}
|
|
|
PayloadExtractor payloadExtractor = new BasePayloadExtractor(table, columnFamily,columnQualifier);
|
|
|
PayloadExtractor payloadExtractor = new BasePayloadExtractor(Bytes.toBytes(table), Bytes.toBytes(columnFamily),Bytes.toBytes(columnQualifier));//配置成参数
|
|
|
HbaseLogListener hbaseLogListener = new HbaseLogListener();
|
|
|
sepConsumer = new SepConsumer(subscriptionName, 5000, hbaseLogListener, 1, localhost, zk, hbaseConf,payloadExtractor);
|
|
|
sepConsumer.start();
|
|
|
log.debug("started");
|
|
|
System.out.println("Started");
|
|
|
while (true) {
|
|
|
Thread.sleep(Long.MAX_VALUE);
|
|
|
}
|
|
|
}catch (Exception e){
|
|
|
log.debug(e.getMessage());
|
|
|
}
|