ソースを参照

Merge branch 'dev' of chenweida/jkzl-start into dev

chenweida 7 年 前
コミット
edb067df61
30 ファイル変更1145 行追加37 行削除
  1. 46 1
      common-logback-starter/pom.xml
  2. 72 0
      common-logback-starter/src/main/java/com.yihu.base/common/RollingUtil.java
  3. 5 5
      common-logback-starter/src/main/java/com.yihu.base/es/ElasticsearchAppender.java
  4. 3 3
      common-logback-starter/src/main/java/com.yihu.base/es/buffer/BufferConsumer.java
  5. 2 2
      common-logback-starter/src/main/java/com.yihu.base/es/buffer/EventBuffer.java
  6. 2 14
      common-logback-starter/src/main/java/com.yihu.base/es/properties/ElasticsearchProperties.java
  7. 82 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/HbaseAppender.java
  8. 76 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/buffer/HBaseBufferConsumer.java
  9. 31 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/buffer/HBaseEventBuffer.java
  10. 153 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/config/HbaseFactory.java
  11. 27 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/properties/BufferProperties.java
  12. 28 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/properties/HbaseAppenderProperties.java
  13. 78 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/properties/HbaseProperties.java
  14. 9 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/IRowkeyGenerate.java
  15. 48 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/RowkeyFactory.java
  16. 15 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/impl/UUIDRowkeyGenerate.java
  17. 12 0
      common-logback-starter/src/main/java/com.yihu.base/hbase/timer/MajorTimer.java
  18. 74 0
      common-logback-starter/src/main/java/com.yihu.base/hdfs/HDFSAppender.java
  19. 55 0
      common-logback-starter/src/main/java/com.yihu.base/hdfs/buffer/HDFSBufferConsumer.java
  20. 31 0
      common-logback-starter/src/main/java/com.yihu.base/hdfs/buffer/HDFSEventBuffer.java
  21. 27 0
      common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/BufferProperties.java
  22. 27 0
      common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/HDFSAppenderProperties.java
  23. 55 0
      common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/HDFSProperties.java
  24. 102 0
      common-logback-starter/src/main/java/com.yihu.base/hdfs/util/HDFSUtil.java
  25. 28 0
      common-logback-starter/src/main/resources/hbaseAppender_logback_demo.xml
  26. 34 0
      common-logback-starter/src/main/resources/hdfsAppender_logback_demo.xml
  27. 11 1
      demo/pom.xml
  28. 1 2
      demo/src/main/java/com/demo/controller/DemoController.java
  29. 10 8
      demo/src/main/resources/logback.xml
  30. 1 1
      pom.xml

+ 46 - 1
common-logback-starter/pom.xml

@ -11,13 +11,49 @@
    </parent>
    <artifactId>common-logback-starter</artifactId>
    <version>1.0.1</version>
    <version>1.0.2</version>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-hbase</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-protocol</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
@ -34,5 +70,14 @@
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>
    </dependencies>
</project>

+ 72 - 0
common-logback-starter/src/main/java/com.yihu.base/common/RollingUtil.java

@ -0,0 +1,72 @@
package com.yihu.base.common;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.temporal.TemporalAdjusters;
/**
 * Created by chenweida on 2018/2/26.
 */
public class RollingUtil {
    /**
     * 在文件名最后追加日期  log-2017-01-01
     *
     * @param name
     * @param rolling
     * @return
     */
    public static String getRollingAppendLast(String name, String rolling) {
        if (StringUtils.isNoneEmpty(rolling)) {
            if ("day".equals(rolling)) {
                return new StringBuffer(name + "-" + LocalDate.now().toString()).toString();
            } else if ("week".equals(rolling)) {
                return new StringBuffer(name + "-" + LocalDate.now().minusWeeks(0).with(DayOfWeek.MONDAY)).toString();
            } else if ("month".equals(rolling)) {
                return new StringBuffer(name + "-" + LocalDate.now().with(TemporalAdjusters.firstDayOfMonth())).toString();
            } else if ("year".equals(rolling)) {
                return new StringBuffer(name + "-" + LocalDate.now().with(TemporalAdjusters.firstDayOfYear())).toString();
            }
        }
        return name;
    }
    /**
     * 在文件名前面追加日期 2017-01-01-log
     *
     * @param name
     * @param rolling
     * @return
     */
    public static String getRollingAppendFirst(String name, String rolling) {
        return getRollingAppendFirst(name, rolling, "");
    }
    /**
     * 在文件名前面追加日期 2017-01-01-log
     *
     * @param name
     * @param rolling
     * @return
     */
    public static String getRollingAppendFirst(String name, String rolling, String suffix) {
        if (StringUtils.isNoneEmpty(suffix)) {
            suffix = File.pathSeparator + suffix; //多加一个 .
        }
        if (StringUtils.isNoneEmpty(rolling)) {
            if ("day".equals(rolling)) {
                return new StringBuffer(LocalDate.now().toString() + "-" + name + suffix).toString();
            } else if ("week".equals(rolling)) {
                return new StringBuffer(LocalDate.now().minusWeeks(0).with(DayOfWeek.MONDAY) + "-" + name + suffix).toString();
            } else if ("month".equals(rolling)) {
                return new StringBuffer(LocalDate.now().with(TemporalAdjusters.firstDayOfMonth()) + "-" + name + suffix).toString();
            } else if ("year".equals(rolling)) {
                return new StringBuffer(LocalDate.now().with(TemporalAdjusters.firstDayOfYear()) + "-" + name + suffix).toString();
            }
        }
        return name + suffix;
    }
}

+ 5 - 5
common-logback-starter/src/main/java/com.yihu.base/es/ElasticsearchAppender.java

@ -2,8 +2,8 @@ package com.yihu.base.es;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.yihu.base.es.buffer.BufferConsumer;
import com.yihu.base.es.buffer.EventBuffer;
import com.yihu.base.es.buffer.EsBufferConsumer;
import com.yihu.base.es.buffer.EsEventBuffer;
import com.yihu.base.es.config.ElasticSearchConnectionFactiory;
import com.yihu.base.es.properties.ElasticsearchAppenderProperties;
@ -13,7 +13,7 @@ public class ElasticsearchAppender extends AppenderBase<LoggingEvent> {
    //消费者
    private Thread bufferConsumerThread = null;
    //缓冲队列
    private EventBuffer eventBuffer = null;
    private EsEventBuffer eventBuffer = null;
    public ElasticsearchAppender() {
    }
@ -23,11 +23,11 @@ public class ElasticsearchAppender extends AppenderBase<LoggingEvent> {
    public void start() {
        super.start();
        //初始化内存缓冲队列
        eventBuffer = new EventBuffer(elasticsearchAppenderProperties.getBufferProperties());
        eventBuffer = new EsEventBuffer(elasticsearchAppenderProperties.getBufferProperties());
        //初始化ES连接池
        ElasticSearchConnectionFactiory.init(elasticsearchAppenderProperties.getElasticsearchProperties());
        //启动消费者
        BufferConsumer bufferConsumer = new BufferConsumer(eventBuffer, elasticsearchAppenderProperties);
        EsBufferConsumer bufferConsumer = new EsBufferConsumer(eventBuffer, elasticsearchAppenderProperties);
        bufferConsumerThread = new Thread(bufferConsumer);
        bufferConsumerThread.start();
    }

+ 3 - 3
common-logback-starter/src/main/java/com.yihu.base/es/buffer/BufferConsumer.java

@ -15,15 +15,15 @@ import java.util.List;
/**
 * Created by chenweida on 2018/2/24.
 */
public class BufferConsumer implements Runnable {
public class EsBufferConsumer implements Runnable {
    //缓冲区
    private EventBuffer eventBuffer;
    private EsEventBuffer eventBuffer;
    //消费者相关配政治
    private ElasticsearchAppenderProperties elasticsearchAppenderProperties;
    //格式化日志数据
    private PatternLayout patternLayout = new PatternLayout();
    public BufferConsumer(EventBuffer eventBuffer, ElasticsearchAppenderProperties elasticsearchAppenderProperties) {
    public EsBufferConsumer(EsEventBuffer eventBuffer, ElasticsearchAppenderProperties elasticsearchAppenderProperties) {
        this.eventBuffer = eventBuffer;
        this.elasticsearchAppenderProperties = elasticsearchAppenderProperties;
    }

+ 2 - 2
common-logback-starter/src/main/java/com.yihu.base/es/buffer/EventBuffer.java

@ -9,11 +9,11 @@ import java.util.concurrent.BlockingQueue;
/**
 * Created by chenweida on 2018/2/24.
 */
public class EventBuffer {
public class EsEventBuffer {
    //缓冲队列
    BlockingQueue queue = null;
    public EventBuffer(BufferProperties bufferProperties) {
    public EsEventBuffer(BufferProperties bufferProperties) {
        queue = new ArrayBlockingQueue(bufferProperties.getBufferSize());
    }

+ 2 - 14
common-logback-starter/src/main/java/com.yihu.base/es/properties/ElasticsearchProperties.java

@ -1,11 +1,10 @@
package com.yihu.base.es.properties;
import org.apache.commons.lang3.StringUtils;
import com.yihu.base.common.RollingUtil;
import java.text.SimpleDateFormat;
import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.temporal.TemporalAdjusters;
/**
 * Created by chenweida on 2018/2/24.
@ -43,18 +42,7 @@ public class ElasticsearchProperties {
    }
    public String getIndex() {
        if (StringUtils.isNoneEmpty(rolling)) {
            if ("day".equals(rolling)) {
                return new StringBuffer(index + "-" + LocalDate.now().toString()).toString();
            } else if ("week".equals(rolling)) {
                return new StringBuffer(index + "-" + LocalDate.now().minusWeeks(0).with(DayOfWeek.MONDAY)).toString();
            } else if ("month".equals(rolling)) {
                return new StringBuffer(index + "-" + LocalDate.now().with(TemporalAdjusters.firstDayOfMonth())).toString();
            } else if ("year".equals(rolling)) {
                return new StringBuffer(index + "-" + LocalDate.now().with(TemporalAdjusters.firstDayOfYear())).toString();
            }
        }
        return index;
        return RollingUtil.getRollingAppendLast(index,rolling);
    }
    public void setIndex(String index) {

+ 82 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/HbaseAppender.java

@ -0,0 +1,82 @@
package com.yihu.base.hbase;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.yihu.base.hbase.buffer.HBaseBufferConsumer;
import com.yihu.base.hbase.buffer.HBaseEventBuffer;
import com.yihu.base.hbase.config.HbaseFactory;
import com.yihu.base.hbase.properties.HbaseAppenderProperties;
public class HbaseAppender extends AppenderBase<LoggingEvent> {
    //相关的全部属性
    private HbaseAppenderProperties hbaseAppenderProperties = new HbaseAppenderProperties();
    //消费者
    private Thread bufferConsumerThread = null;
    //缓冲队列
    private HBaseEventBuffer eventBuffer = null;
    private HbaseFactory hbaseFactory = null;
    @Override
    public void start() {
        super.start();
        //初始化内存缓冲队列
        eventBuffer = new HBaseEventBuffer(hbaseAppenderProperties.getBufferProperties());
        //初始化hbase链接
        hbaseFactory = new HbaseFactory(hbaseAppenderProperties.getHbaseProperties());
        hbaseFactory.init();
        //启动消费者
        HBaseBufferConsumer bufferConsumer = new HBaseBufferConsumer(eventBuffer, hbaseAppenderProperties, hbaseFactory);
        bufferConsumerThread = new Thread(bufferConsumer);
        bufferConsumerThread.start();
    }
    @Override
    protected void append(LoggingEvent eventObject) {
        //添加日志到缓冲区
        eventBuffer.addLogEvent(eventObject);
    }
    //========================================properties========================================
    //==============hbase start==============
    public void setTableName(String tableName) {
        hbaseAppenderProperties.getHbaseProperties().setTableName(tableName);
    }
    public void setFamilyName(String familyName) {
        hbaseAppenderProperties.getHbaseProperties().setFamilyName(familyName);
    }
    public void setZkHosts(String zkHosts) {
        hbaseAppenderProperties.getHbaseProperties().setZkHosts(zkHosts);
    }
    public void setZkZnodeParent(String zkZnodeParent) {
        hbaseAppenderProperties.getHbaseProperties().setZkZnodeParent(zkZnodeParent);
    }
    public void setHdfsUserName(String hdfsUserName) {
        hbaseAppenderProperties.getHbaseProperties().setHdfsUserName(hdfsUserName);
    }
    public void setRowkey(String rowkey) {
        hbaseAppenderProperties.getHbaseProperties().setRowkey(rowkey);
    }
    //==============hbase end==============
    //==============buffer start==============
    public void setSleepTime(Long sleepTime) {
        hbaseAppenderProperties.getBufferProperties().setSleepTime(sleepTime);
    }
    public void setBufferSize(Integer bufferSize) {
        hbaseAppenderProperties.getBufferProperties().setBufferSize(bufferSize);
    }
    //==============buffer end==============
}

+ 76 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/buffer/HBaseBufferConsumer.java

@ -0,0 +1,76 @@
package com.yihu.base.hbase.buffer;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.alibaba.fastjson.JSON;
import com.yihu.base.hbase.config.HbaseFactory;
import com.yihu.base.hbase.properties.HbaseAppenderProperties;
import com.yihu.base.hbase.properties.HbaseProperties;
import com.yihu.base.hbase.rowkey.RowkeyFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HBaseBufferConsumer implements Runnable {
    //缓冲区
    private HBaseEventBuffer eventBuffer;
    //消费者相关配置
    private HbaseAppenderProperties hbaseAppenderProperties;
    //消费者相关配置
    private HbaseProperties hbaseProperties;
    private HbaseFactory hbaseFactory=null;
    public HBaseBufferConsumer(HBaseEventBuffer eventBuffer, HbaseAppenderProperties hbaseAppenderProperties,HbaseFactory hbaseFactory) {
        this.eventBuffer = eventBuffer;
        this.hbaseAppenderProperties = hbaseAppenderProperties;
        this.hbaseFactory=hbaseFactory;
        this.hbaseProperties=hbaseAppenderProperties.getHbaseProperties();
    }
    @Override
    public void run() {
        while (true) {
            try {
                //如果队列没数据休眠
                if (eventBuffer.getBuffer().size() == 0) {
                    sleep();
                    continue;
                }
                List<ILoggingEvent> eventObjectList = new ArrayList<>();
                //获取队列中的全部数据
                eventBuffer.getBuffer().drainTo(eventObjectList);
                List<String> rowkeList=new ArrayList<>();
                List<Map<String,Map<String,String>>> familyList=new ArrayList<>();
                for(ILoggingEvent loggingEvent:eventObjectList){
                    rowkeList.add(RowkeyFactory.getRowkey(hbaseProperties));
                    Map<String,String> logMap = (Map<String, String>) JSON.parse(loggingEvent.getFormattedMessage());
                    Map<String,Map<String,String>> data =new HashMap<>();
                    data.put(hbaseProperties.getFamilyName(),logMap);
                    familyList.add(data);
                }
                hbaseFactory.addLogBulk(hbaseProperties.getTableName(),rowkeList,familyList);
                //线程休眠
                sleep();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void sleep() throws Exception {
        //线程休眠
        Thread.sleep(hbaseAppenderProperties.getBufferProperties().getSleepTime());
    }
}

+ 31 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/buffer/HBaseEventBuffer.java

@ -0,0 +1,31 @@
package com.yihu.base.hbase.buffer;
import ch.qos.logback.classic.spi.LoggingEvent;
import com.yihu.base.hbase.properties.BufferProperties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HBaseEventBuffer {
    //缓冲队列
    BlockingQueue queue = null;
    public HBaseEventBuffer(BufferProperties bufferProperties) {
        queue = new ArrayBlockingQueue(bufferProperties.getBufferSize());
    }
    public BlockingQueue getBuffer() {
        return queue;
    }
    public void setQueue(BlockingQueue queue) {
        this.queue = queue;
    }
    public void addLogEvent(LoggingEvent eventObject) {
        queue.add(eventObject);
    }
}

+ 153 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/config/HbaseFactory.java

@ -0,0 +1,153 @@
package com.yihu.base.hbase.config;
import com.yihu.base.hbase.properties.HbaseProperties;
import com.yihu.base.hbase.rowkey.IRowkeyGenerate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.TableCallback;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * Created by chenweida on 2018/2/27.
 */
public class HbaseFactory {
    private HbaseTemplate hbaseTemplate = new HbaseTemplate();
    private HbaseProperties hbaseProperties;
    private volatile Configuration configuration;
    public HbaseFactory(HbaseProperties hbaseProperties) {
        this.hbaseProperties = hbaseProperties;
    }
    private IRowkeyGenerate rowkeyRule;
    /**
     * 批量新增行
     */
    public void addLogBulk(String tableName, List<String> rowkeyList, List<Map<String, Map<String, String>>> familyList) throws Exception {
        hbaseTemplate.execute(tableName, new TableCallback<String>() {
            @Override
            public String doInTable(HTableInterface table) throws Throwable {
                List<Put> list = new ArrayList<>();
                for (int i = 0; i < rowkeyList.size(); i++) {
                    Put p = new Put(rowkeyList.get(i).getBytes());
                    Map<String, Map<String, String>> family = familyList.get(i);
                    for (String familyName : family.keySet()) {
                        Map<String, String> map = family.get(familyName);
                        for (String qualifier : map.keySet()) {
                            String value = map.get(qualifier);
                            if (value == null) {
                                continue;
                            }
                            p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
                        }
                    }
                    list.add(p);
                }
                table.put(list);
                return null;
            }
        });
    }
    public void init() {
        Connection connection = null;
        HBaseAdmin hBaseAdmin = null;
        try {
            //获取链接
            connection = getConnection();
            hBaseAdmin = (HBaseAdmin) connection.getAdmin();
            //判断表名是否存在
            if (!hBaseAdmin.tableExists(hbaseProperties.getTableName())) {
                //创建表
                createTable(hbaseProperties);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (hBaseAdmin != null) {
                    hBaseAdmin.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 获取链接
     *
     * @return
     * @throws IOException
     */
    private Connection getConnection() throws IOException {
        if (configuration == null) {
            synchronized (HbaseFactory.class) {
                if (configuration == null) {
                    //设置hadoop账号
                    System.setProperty("HADOOP_USER_NAME", hbaseProperties.getHdfsUserName());
                    configuration = HBaseConfiguration.create();
                    configuration.set("hbase.zookeeper.quorum", hbaseProperties.getZkHosts());
                    configuration.set("zookeeper.znode.parent",HbaseProperties.default_zkZnodeParent);
                    configuration.set("hbase.zookeeper.property.clientPort",HbaseProperties.default_zkPort);
                    hbaseTemplate.setConfiguration(configuration);
                }
            }
        }
        return ConnectionFactory.createConnection(configuration);
    }
    /**
     * 创建表
     *
     * @param hbaseProperties
     * @throws Exception
     */
    private void createTable(HbaseProperties hbaseProperties) throws Exception {
        Connection connection = getConnection();
        HBaseAdmin hBaseAdmin = (HBaseAdmin) connection.getAdmin();
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(hbaseProperties.getTableName()));
        //最多建议1-3个列族
        for (String family : hbaseProperties.getFamilyNames()) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(family);
            hColumnDescriptor.setBlockCacheEnabled(true);//开始读内存缓存
            hColumnDescriptor.setInMemory(true);//是否加载到内存
            hColumnDescriptor.setMaxVersions(1);//版本数1
            hTableDescriptor.addFamily(hColumnDescriptor);
        }
        hBaseAdmin.createTable(hTableDescriptor);
        hBaseAdmin.close();
        connection.close();
    }
}

+ 27 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/properties/BufferProperties.java

@ -0,0 +1,27 @@
package com.yihu.base.hbase.properties;
/**
 * Created by chenweida on 2018/2/24.
 * 缓冲区相关配置
 */
public class BufferProperties {
    private Long sleepTime = 1000L;//多久消费一次消息
    private Integer bufferSize = 100000;//缓冲区的大小
    public Long getSleepTime() {
        return sleepTime;
    }
    public void setSleepTime(Long sleepTime) {
        this.sleepTime = sleepTime;
    }
    public Integer getBufferSize() {
        return bufferSize;
    }
    public void setBufferSize(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }
}

+ 28 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/properties/HbaseAppenderProperties.java

@ -0,0 +1,28 @@
package com.yihu.base.hbase.properties;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HbaseAppenderProperties {
    private HbaseProperties hbaseProperties=new HbaseProperties();
   //缓存区相关的配置
    private BufferProperties bufferProperties = new BufferProperties();
    public HbaseProperties getHbaseProperties() {
        return hbaseProperties;
    }
    public void setHbaseProperties(HbaseProperties hbaseProperties) {
        this.hbaseProperties = hbaseProperties;
    }
    public BufferProperties getBufferProperties() {
        return bufferProperties;
    }
    public void setBufferProperties(BufferProperties bufferProperties) {
        this.bufferProperties = bufferProperties;
    }
}

+ 78 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/properties/HbaseProperties.java

@ -0,0 +1,78 @@
package com.yihu.base.hbase.properties;
/**
 * Created by chenweida on 2018/2/24.
 * ES相关配置
 */
public class HbaseProperties {
    public static final String default_zkZnodeParent = "/hbase-unsecure";//zk上的路径
    public static final String default_zkPort = "2181";//zk上的路径
    public  static final String rowkeyGenerate_uuid="UUID";
    private String tableName; //表明
    private String familyName;//列族名称 多个逗号分隔
    private String zkHosts;//zookeeper路劲
    private String zkZnodeParent ;//zk上的路径
    private String hdfsUserName;//hdfs用户名称
    private String rowkey;//rowkey规则  UUID
    public String getTableName() {
        return tableName;
    }
    public void setTableName(String tableName) {
        this.tableName = tableName;
    }
    public String getFamilyName() {
        return familyName;
    }
    public String[] getFamilyNames() {
        return familyName.split(",");
    }
    public void setFamilyName(String familyName) {
        this.familyName = familyName;
    }
    public String getZkHosts() {
        return zkHosts;
    }
    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }
    public String getZkZnodeParent() {
        return zkZnodeParent;
    }
    public void setZkZnodeParent(String zkZnodeParent) {
        this.zkZnodeParent = zkZnodeParent;
    }
    public String getHdfsUserName() {
        return hdfsUserName;
    }
    public void setHdfsUserName(String hdfsUserName) {
        this.hdfsUserName = hdfsUserName;
    }
    public String getRowkey() {
        return rowkey;
    }
    public void setRowkey(String rowkey) {
        this.rowkey = rowkey;
    }
}

+ 9 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/IRowkeyGenerate.java

@ -0,0 +1,9 @@
package com.yihu.base.hbase.rowkey;
/**
 * Created by chenweida on 2018/2/28.
 */
public interface IRowkeyGenerate {
    String getRowkey();
}

+ 48 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/RowkeyFactory.java

@ -0,0 +1,48 @@
package com.yihu.base.hbase.rowkey;
import com.yihu.base.hbase.properties.HbaseProperties;
import com.yihu.base.hbase.rowkey.impl.UUIDRowkeyGenerate;
import org.apache.commons.lang3.StringUtils;
/**
 * Created by chenweida on 2018/2/28.
 */
public class RowkeyFactory {
    private volatile static IRowkeyGenerate rowkeyGenerate = null;
    private RowkeyFactory() {
    }
    public static String getRowkey(HbaseProperties hbaseProperties) {
        //初始化rowkey生成器
        if (rowkeyGenerate == null) {
            synchronized (RowkeyFactory.class) {
                if (rowkeyGenerate == null) {
                    initIRowkeyGenerate(hbaseProperties);
                }
            }
        }
        return rowkeyGenerate.getRowkey();
    }
    private static void initIRowkeyGenerate(HbaseProperties hbaseProperties) {
        //如果为空默认uuid
        if (StringUtils.isNoneEmpty(hbaseProperties.getRowkey())) {
            rowkeyGenerate = new UUIDRowkeyGenerate();
        } else {
            switch (hbaseProperties.getRowkey()) {
                case HbaseProperties.rowkeyGenerate_uuid: {
                    rowkeyGenerate = new UUIDRowkeyGenerate();
                    return;
                }
                default: {
                    rowkeyGenerate = new UUIDRowkeyGenerate();
                    return;
                }
            }
        }
    }
}

+ 15 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/impl/UUIDRowkeyGenerate.java

@ -0,0 +1,15 @@
package com.yihu.base.hbase.rowkey.impl;
import com.yihu.base.hbase.rowkey.IRowkeyGenerate;
import java.util.UUID;
/**
 * Created by chenweida on 2018/2/28.
 */
public class UUIDRowkeyGenerate implements IRowkeyGenerate {
    @Override
    public String getRowkey() {
        return UUID.randomUUID().toString().replace("-","");
    }
}

+ 12 - 0
common-logback-starter/src/main/java/com.yihu.base/hbase/timer/MajorTimer.java

@ -0,0 +1,12 @@
package com.yihu.base.hbase.timer;
import java.util.Timer;
/**
 * Created by chenweida on 2018/2/27.
 * 优化hbase效率的时候可以手动关闭major
 * hbase.hregion.majorcompaction
 * 然后自己定时在凌晨的时候去合并,因为在合并的时候无法操作文件
 */
public class MajorTimer extends Timer {
}

+ 74 - 0
common-logback-starter/src/main/java/com.yihu.base/hdfs/HDFSAppender.java

@ -0,0 +1,74 @@
package com.yihu.base.hdfs;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.yihu.base.hdfs.buffer.HDFSBufferConsumer;
import com.yihu.base.hdfs.buffer.HDFSEventBuffer;
import com.yihu.base.hdfs.properties.HDFSAppenderProperties;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSAppender extends AppenderBase<LoggingEvent> {
    private HDFSAppenderProperties hdfsAppenderProperties = new HDFSAppenderProperties();
    private HDFSEventBuffer eventBuffer;
    //消费者
    private Thread bufferConsumerThread = null;
    @Override
    public void start() {
        super.start();
        //初始化内存缓冲队列
        eventBuffer = new HDFSEventBuffer(hdfsAppenderProperties.getBufferProperties());
        //启动消费者
        HDFSBufferConsumer bufferConsumer = new HDFSBufferConsumer(eventBuffer, hdfsAppenderProperties);
        bufferConsumerThread = new Thread(bufferConsumer);
        bufferConsumerThread.start();
    }
    @Override
    protected void append(LoggingEvent eventObject) {
    }
    //========================================properties========================================
    //==============hdfs start==============
    public void setHosts(String hosts) {
        hdfsAppenderProperties.getHdfsProperties().setHosts(hosts);
    }
    public void setPath(String path) {
        hdfsAppenderProperties.getHdfsProperties().setPath(path);
    }
    public void setFileName(String fileName) {
        hdfsAppenderProperties.getHdfsProperties().setFileName(fileName);
    }
    public void setRolling(String rolling) {
        hdfsAppenderProperties.getHdfsProperties().setRolling(rolling);
    }
    public void setSuffix(String suffix) {
        hdfsAppenderProperties.getHdfsProperties().setSuffix(suffix);
    }
    //==============hdfs end==============
    //==============buffer start==============
    public void setSleepTime(Long sleepTime) {
        hdfsAppenderProperties.getBufferProperties().setSleepTime(sleepTime);
    }
    public void setBufferSize(Integer bufferSize) {
        hdfsAppenderProperties.getBufferProperties().setBufferSize(bufferSize);
    }
    //==============buffer end==============
}

+ 55 - 0
common-logback-starter/src/main/java/com.yihu.base/hdfs/buffer/HDFSBufferConsumer.java

@ -0,0 +1,55 @@
package com.yihu.base.hdfs.buffer;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.yihu.base.hdfs.properties.HDFSAppenderProperties;
import java.util.ArrayList;
import java.util.List;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HDFSBufferConsumer implements Runnable {
    //缓冲区
    private HDFSEventBuffer eventBuffer;
    //消费者相关配政治
    private HDFSAppenderProperties hdfsAppenderProperties;
    public HDFSBufferConsumer(HDFSEventBuffer eventBuffer, HDFSAppenderProperties hdfsAppenderProperties) {
        this.eventBuffer = eventBuffer;
        this.hdfsAppenderProperties = hdfsAppenderProperties;
    }
    @Override
    public void run() {
        while (true) {
            try {
                //如果队列没数据休眠
                if (eventBuffer.getBuffer().size() == 0) {
                    sleep();
                    continue;
                }
                List<ILoggingEvent> eventObjectList = new ArrayList<>();
                //获取队列中的全部数据
                eventBuffer.getBuffer().drainTo(eventObjectList);
                //判断hdfs中是否存在这个文件
                //没有就创建hdfs文件并且发送内容
                //有的话就完文件里面追加内容即可
                //线程休眠
                sleep();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void sleep() throws Exception {
        //线程休眠
        Thread.sleep(hdfsAppenderProperties.getBufferProperties().getSleepTime());
    }
}

+ 31 - 0
common-logback-starter/src/main/java/com.yihu.base/hdfs/buffer/HDFSEventBuffer.java

@ -0,0 +1,31 @@
package com.yihu.base.hdfs.buffer;
import ch.qos.logback.classic.spi.LoggingEvent;
import com.yihu.base.hdfs.properties.BufferProperties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HDFSEventBuffer {
    //缓冲队列
    BlockingQueue queue = null;
    public HDFSEventBuffer(BufferProperties bufferProperties) {
        queue = new ArrayBlockingQueue(bufferProperties.getBufferSize());
    }
    public BlockingQueue getBuffer() {
        return queue;
    }
    public void setQueue(BlockingQueue queue) {
        this.queue = queue;
    }
    public void addLogEvent(LoggingEvent eventObject) {
        queue.add(eventObject);
    }
}

+ 27 - 0
common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/BufferProperties.java

@ -0,0 +1,27 @@
package com.yihu.base.hdfs.properties;
/**
 * Created by chenweida on 2018/2/24.
 * 缓冲区相关配置
 */
public class BufferProperties {
    private Long sleepTime = 1000L;//多久消费一次消息
    private Integer bufferSize = 100000;//缓冲区的大小
    public Long getSleepTime() {
        return sleepTime;
    }
    public void setSleepTime(Long sleepTime) {
        this.sleepTime = sleepTime;
    }
    public Integer getBufferSize() {
        return bufferSize;
    }
    public void setBufferSize(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }
}

+ 27 - 0
common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/HDFSAppenderProperties.java

@ -0,0 +1,27 @@
package com.yihu.base.hdfs.properties;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSAppenderProperties {
    private HDFSProperties hdfsProperties=new HDFSProperties();
    private BufferProperties bufferProperties=new BufferProperties();
    public HDFSProperties getHdfsProperties() {
        return hdfsProperties;
    }
    public void setHdfsProperties(HDFSProperties hdfsProperties) {
        this.hdfsProperties = hdfsProperties;
    }
    public BufferProperties getBufferProperties() {
        return bufferProperties;
    }
    public void setBufferProperties(BufferProperties bufferProperties) {
        this.bufferProperties = bufferProperties;
    }
}

+ 55 - 0
common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/HDFSProperties.java

@ -0,0 +1,55 @@
package com.yihu.base.hdfs.properties;
import com.yihu.base.common.RollingUtil;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSProperties {
    private String hosts;//hdfs://192.168.131.240:9000/
    private String path;//日志的存储路径
    private String fileName;//日志的名称
    private String rolling;//日志按照什么滚动  day week month year
    private String suffix = "log";//后缀
    public String getHosts() {
        return hosts;
    }
    public void setHosts(String hosts) {
        this.hosts = hosts;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public String getFileName() {
        return RollingUtil.getRollingAppendFirst(fileName, rolling, suffix);
    }
    public void setFileName(String fileName) {
        this.fileName = fileName;
    }
    public String getRolling() {
        return rolling;
    }
    public void setRolling(String rolling) {
        this.rolling = rolling;
    }
    public String getSuffix() {
        return suffix;
    }
    public void setSuffix(String suffix) {
        this.suffix = suffix;
    }
}

+ 102 - 0
common-logback-starter/src/main/java/com.yihu.base/hdfs/util/HDFSUtil.java

@ -0,0 +1,102 @@
package com.yihu.base.hdfs.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.OutputStream;
import java.net.URI;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSUtil {
    private static Logger logger = LoggerFactory.getLogger(HDFSUtil.class);
    private static String uri = "hdfs://192.168.131.240:9000/";
    /**
     * @param uri     hdfs地址
     * @param message 追加的内容
     * @param path    文件路径
     *                <p/>
     *                <p/>
     *                追加文件内容
     *                上面的解释明显的提到如果需要使用append操作,需要升级到hadoop 2.x版本。并且需要在Conf的hdfs.site.xml文件中加入如下配置:
     *                <p/>
     *                <property>
     *                <name>dfs.support.append</name>
     *                <value>true</value>
     *                </property>
     *                Hadoop的API中也提供了设置项来支持内容追加,代码如下:
     *                <p/>
     *                Configuration conf = new Configuration();
     *                conf.setBoolean("dfs.support.append", true);
     *                https://www.cnblogs.com/flyeast/archive/2014/07/20/3856356.html
     */
    public static void appendFile(String uri, String path, String message) {
        try {
            Configuration conf = new Configuration();
            conf.setBoolean("dfs.support.append", true);//开启文件追加模式
            FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
            if (exsit(fileSystem, uri, path)) {
            } else {
                //如果不存在就创建文件
                fileSystem.create(new Path(path));
            }
            //直接追加
            append(fileSystem, uri, message, path);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
    /**
     * @param uri     hdfs地址
     * @param pathStr 文件路径
     *                判断文件是否存在
     */
    private static boolean exsit(FileSystem fileSystem, String uri, String pathStr)
            throws Exception {
        try {
            Path path = new Path(pathStr);
            return fileSystem.exists(path);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 追加文件
     *
     * @param fileSystem
     * @param uri
     * @param message
     * @param path
     * @throws Exception
     */
    private static void append(FileSystem fileSystem, String uri, String message, String path)
            throws Exception {
        //如果存在就直接追加
        OutputStream out = fileSystem.append(new Path(path));
        out.write((message + "\r\n").getBytes("UTF-8"));
        out.flush();
        out.close();
    }
    public static void main(String[] args) throws Exception {
        while (true) {
            String uri = "hdfs://172.17.110.20:8020/";
            String message = "ceshi";
            String path = "/user/root/ceshi123.log";
            HDFSUtil.appendFile(uri, path, message);
        }
    }
}

+ 28 - 0
common-logback-starter/src/main/resources/hbaseAppender_logback_demo.xml

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- 这个是根配置文件,一定要有的
    scan:是当配置文件被修改后会被重新加载
    scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。
    debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。
-->
<configuration scan="true" scanPeriod="6000" debug="false">
    <logger name="org.hibernate" level="WARN"/>
    <logger name="org.springframework" level="WARN"/>
    <appender name="hbase_appender" class="com.yihu.base.hbase.HbaseAppender">
        <tableName>tableName</tableName><!--表明-->
        <familyName>familyName</familyName> <!--列族-->
        <zkHosts>node1.hde.h3c.com,node2.hde.h3c.com,node3.hde.h3c.com</zkHosts>  <!--zk 路径-->
        <hdfsUserName>root</hdfsUserName> <!--hdfs的用户名-->
        <rowkey>UUID</rowkey> <!--rowKey规则  目前只支持UUID 默认UUID -->
    </appender>
    <logger name="hbase_logger" level="INFO" additivity="false">
        <appender-ref ref="hbase_appender"/>
    </logger>
    <!--提高整个日志的错误级别-->
    <root level="INFO">
    </root>
</configuration>

+ 34 - 0
common-logback-starter/src/main/resources/hdfsAppender_logback_demo.xml

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- 这个是根配置文件,一定要有的
    scan:是当配置文件被修改后会被重新加载
    scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。
    debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。
-->
<configuration scan="true" scanPeriod="6000" debug="false">
    <logger name="org.hibernate" level="WARN"/>
    <logger name="org.springframework" level="WARN"/>
    <appender name="hdfs_appender" class="com.yihu.base.es.ElasticsearchAppender">
        <hosts>hdfs://192.168.131.240:9000/</hosts> <!--支持集群 逗号分割-->
        <path>/wlyy/business/</path>
        <fileName>business</fileName><!--文件名称-->
        <suffix>log</suffix> <!--后缀 默认log-->
        <!--按照什么模式区滚动 支持 按照
         每天:day
         每周:week
         每月:month
         每年:year
          -->
        <rolling>day</rolling>
    </appender>
    <logger name="hdfs_logger" level="INFO" additivity="false">
        <appender-ref ref="hdfs_appender"/>
    </logger>
    <!--提高整个日志的错误级别-->
    <root level="INFO">
    </root>
</configuration>

+ 11 - 1
demo/pom.xml

@ -24,7 +24,17 @@
        <dependency>
            <groupId>com.yihu</groupId>
            <artifactId>common-logback-starter</artifactId>
            <version>1.0.1</version>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>4.3.13.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>
    </dependencies>
</project>

+ 1 - 2
demo/src/main/java/com/demo/controller/DemoController.java

@ -12,7 +12,7 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
    private Logger logger = LoggerFactory.getLogger("elasticsearch_logger");
    private Logger logger = LoggerFactory.getLogger("hbase_logger");
    @RequestMapping(value = "/loginfo", method = RequestMethod.GET)
    public String loginfo() {
@ -20,7 +20,6 @@ public class DemoController {
        while (flag) {
            logger.info("{\"aaa\":\"123\",\"bbb\":\"123\",\"ccc\":\"123\"}");
        }
        return "成功";
    }
}

+ 10 - 8
demo/src/main/resources/logback.xml

@ -7,12 +7,14 @@
    <logger name="org.springframework" level="WARN"/>
    <logger name="io.searchbox" level="WARN"/>
    <appender name="elasticsearch" class="com.yihu.base.es.ElasticsearchAppender">
        <hosts>http://172.19.103.68:9200</hosts>
        <index>logs</index>
        <type>logs</type>
        <clusterName>jkzl</clusterName>
        <rolling>day</rolling>
    <appender name="hbase_appender" class="com.yihu.base.hbase.HbaseAppender">
        <tableName>tableName</tableName><!--表明-->
        <familyName>familyName</familyName> <!--列族-->
        <zkHosts>node1.hde.h3c.com,node2.hde.h3c.com,node3.hde.h3c.com</zkHosts>  <!--zk 路径-->
        <hdfsUserName>root</hdfsUserName> <!--hdfs的用户名-->
        <rowkey>UUID</rowkey> <!--rowKey规则  目前只支持UUID 默认UUID -->
    </appender>
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
@ -22,8 +24,8 @@
        </encoder>
    </appender>
    <logger name="elasticsearch_logger" level="INFO" additivity="false">
        <appender-ref ref="elasticsearch"/>
    <logger name="hbase_logger" level="INFO" additivity="false">
        <appender-ref ref="hbase_appender"/>
        <appender-ref ref="console"/>
    </logger>

+ 1 - 1
pom.xml

@ -56,7 +56,7 @@
        <version.mysql>5.1.38</version.mysql>
        <version.jackson>2.8.1</version.jackson>
        <version.myCommon>1.0.0</version.myCommon>
        <version.spring-data-hadoop>2.3.0.RELEASE</version.spring-data-hadoop>
        <version.spring-data-hadoop>2.5.0.RELEASE</version.spring-data-hadoop>
        <version.springside>4.2.3-GA</version.springside>
        <version.zipkin>1.24.0</version.zipkin>
        <version.hibernate>5.0.12.Final</version.hibernate>