chenweida hace 6 años
padre
commit
099f731be4
Se han modificado 23 ficheros con 955 adiciones y 29 borrados
  1. 29 29
      common-logback-starter/pom.xml
  2. 0 0
      common-logback-starter/src/main/resources/bak/hbase/HbaseAppender.java
  3. 0 0
      common-logback-starter/src/main/resources/bak/hbase/buffer/HBaseBufferConsumer.java
  4. 0 0
      common-logback-starter/src/main/resources/bak/hbase/buffer/HBaseEventBuffer.java
  5. 0 0
      common-logback-starter/src/main/resources/bak/hbase/config/HbaseFactory.java
  6. 86 0
      common-logback-starter/src/main/resources/bak_java/hbase/HbaseAppender.java
  7. 76 0
      common-logback-starter/src/main/resources/bak_java/hbase/buffer/HBaseBufferConsumer.java
  8. 31 0
      common-logback-starter/src/main/resources/bak_java/hbase/buffer/HBaseEventBuffer.java
  9. 158 0
      common-logback-starter/src/main/resources/bak_java/hbase/config/HbaseFactory.java
  10. 0 0
      common-logback-starter/src/main/resources/bak_java/hbase/properties/BufferProperties.java
  11. 28 0
      common-logback-starter/src/main/resources/bak_java/hbase/properties/HbaseAppenderProperties.java
  12. 92 0
      common-logback-starter/src/main/resources/bak_java/hbase/properties/HbaseProperties.java
  13. 9 0
      common-logback-starter/src/main/resources/bak_java/hbase/rowkey/IRowkeyGenerate.java
  14. 48 0
      common-logback-starter/src/main/resources/bak_java/hbase/rowkey/RowkeyFactory.java
  15. 15 0
      common-logback-starter/src/main/resources/bak_java/hbase/rowkey/impl/UUIDRowkeyGenerate.java
  16. 12 0
      common-logback-starter/src/main/resources/bak_java/hbase/timer/MajorTimer.java
  17. 74 0
      common-logback-starter/src/main/resources/bak_java/hdfs/HDFSAppender.java
  18. 55 0
      common-logback-starter/src/main/resources/bak_java/hdfs/buffer/HDFSBufferConsumer.java
  19. 31 0
      common-logback-starter/src/main/resources/bak_java/hdfs/buffer/HDFSEventBuffer.java
  20. 27 0
      common-logback-starter/src/main/resources/bak_java/hdfs/properties/BufferProperties.java
  21. 27 0
      common-logback-starter/src/main/resources/bak_java/hdfs/properties/HDFSAppenderProperties.java
  22. 55 0
      common-logback-starter/src/main/resources/bak_java/hdfs/properties/HDFSProperties.java
  23. 102 0
      common-logback-starter/src/main/resources/bak_java/hdfs/util/HDFSUtil.java

+ 29 - 29
common-logback-starter/pom.xml

@ -18,38 +18,10 @@
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-hbase</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-protocol</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
@ -77,7 +49,35 @@
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
            <version>18.0</version>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.springframework.data</groupId>-->
            <!--<artifactId>spring-data-hadoop-hbase</artifactId>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.hbase</groupId>-->
            <!--<artifactId>hbase-client</artifactId>-->
            <!--<exclusions>-->
                <!--<exclusion>-->
                    <!--<groupId>org.slf4j</groupId>-->
                    <!--<artifactId>slf4j-log4j12</artifactId>-->
                <!--</exclusion>-->
            <!--</exclusions>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.hbase</groupId>-->
            <!--<artifactId>hbase-common</artifactId>-->
            <!--<exclusions>-->
                <!--<exclusion>-->
                    <!--<groupId>org.slf4j</groupId>-->
                    <!--<artifactId>slf4j-log4j12</artifactId>-->
                <!--</exclusion>-->
            <!--</exclusions>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.hbase</groupId>-->
            <!--<artifactId>hbase-protocol</artifactId>-->
        <!--</dependency>-->
    </dependencies>
</project>

common-logback-starter/src/main/java/com.yihu.base/hbase/HbaseAppender.java → common-logback-starter/src/main/resources/bak/hbase/HbaseAppender.java


common-logback-starter/src/main/java/com.yihu.base/hbase/buffer/HBaseBufferConsumer.java → common-logback-starter/src/main/resources/bak/hbase/buffer/HBaseBufferConsumer.java


common-logback-starter/src/main/java/com.yihu.base/hbase/buffer/HBaseEventBuffer.java → common-logback-starter/src/main/resources/bak/hbase/buffer/HBaseEventBuffer.java


common-logback-starter/src/main/java/com.yihu.base/hbase/config/HbaseFactory.java → common-logback-starter/src/main/resources/bak/hbase/config/HbaseFactory.java


+ 86 - 0
common-logback-starter/src/main/resources/bak_java/hbase/HbaseAppender.java

@ -0,0 +1,86 @@
package com.yihu.base.hbase;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.yihu.base.hbase.buffer.HBaseBufferConsumer;
import com.yihu.base.hbase.buffer.HBaseEventBuffer;
import com.yihu.base.hbase.config.HbaseFactory;
import com.yihu.base.hbase.properties.HbaseAppenderProperties;
public class HbaseAppender extends AppenderBase<LoggingEvent> {
    //相关的全部属性
    private HbaseAppenderProperties hbaseAppenderProperties = new HbaseAppenderProperties();
    //消费者
    private Thread bufferConsumerThread = null;
    //缓冲队列
    private HBaseEventBuffer eventBuffer = null;
    private HbaseFactory hbaseFactory = null;
    @Override
    public void start() {
        super.start();
        //初始化内存缓冲队列
        eventBuffer = new HBaseEventBuffer(hbaseAppenderProperties.getBufferProperties());
        //初始化hbase链接
        hbaseFactory = new HbaseFactory(hbaseAppenderProperties.getHbaseProperties());
        hbaseFactory.init();
        //启动消费者
        HBaseBufferConsumer bufferConsumer = new HBaseBufferConsumer(eventBuffer, hbaseAppenderProperties, hbaseFactory);
        bufferConsumerThread = new Thread(bufferConsumer);
        bufferConsumerThread.start();
    }
    @Override
    protected void append(LoggingEvent eventObject) {
        //添加日志到缓冲区
        eventBuffer.addLogEvent(eventObject);
    }
    //========================================properties========================================
    //==============hbase start==============
    public void setTableName(String tableName) {
        hbaseAppenderProperties.getHbaseProperties().setTableName(tableName);
    }
    public void setFamilyName(String familyName) {
        hbaseAppenderProperties.getHbaseProperties().setFamilyName(familyName);
    }
    public void setZkHosts(String zkHosts) {
        hbaseAppenderProperties.getHbaseProperties().setZkHosts(zkHosts);
    }
    public void setZkZnodeParent(String zkZnodeParent) {
        hbaseAppenderProperties.getHbaseProperties().setZkZnodeParent(zkZnodeParent);
    }
    public void setHdfsUserName(String hdfsUserName) {
        hbaseAppenderProperties.getHbaseProperties().setHdfsUserName(hdfsUserName);
    }
    public void setRowkey(String rowkey) {
        hbaseAppenderProperties.getHbaseProperties().setRowkey(rowkey);
    }
    public void setZkPort(String zkPort) {
        hbaseAppenderProperties.getHbaseProperties().setZkPort(zkPort);
    }
    //==============hbase end==============
    //==============buffer start==============
    public void setSleepTime(Long sleepTime) {
        hbaseAppenderProperties.getBufferProperties().setSleepTime(sleepTime);
    }
    public void setBufferSize(Integer bufferSize) {
        hbaseAppenderProperties.getBufferProperties().setBufferSize(bufferSize);
    }
    //==============buffer end==============
}

+ 76 - 0
common-logback-starter/src/main/resources/bak_java/hbase/buffer/HBaseBufferConsumer.java

@ -0,0 +1,76 @@
package com.yihu.base.hbase.buffer;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.alibaba.fastjson.JSON;
import com.yihu.base.hbase.config.HbaseFactory;
import com.yihu.base.hbase.properties.HbaseAppenderProperties;
import com.yihu.base.hbase.properties.HbaseProperties;
import com.yihu.base.hbase.rowkey.RowkeyFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HBaseBufferConsumer implements Runnable {
    //缓冲区
    private HBaseEventBuffer eventBuffer;
    //消费者相关配置
    private HbaseAppenderProperties hbaseAppenderProperties;
    //消费者相关配置
    private HbaseProperties hbaseProperties;
    private HbaseFactory hbaseFactory=null;
    public HBaseBufferConsumer(HBaseEventBuffer eventBuffer, HbaseAppenderProperties hbaseAppenderProperties,HbaseFactory hbaseFactory) {
        this.eventBuffer = eventBuffer;
        this.hbaseAppenderProperties = hbaseAppenderProperties;
        this.hbaseFactory=hbaseFactory;
        this.hbaseProperties=hbaseAppenderProperties.getHbaseProperties();
    }
    @Override
    public void run() {
        while (true) {
            try {
                //如果队列没数据休眠
                if (eventBuffer.getBuffer().size() == 0) {
                    sleep();
                    continue;
                }
                List<ILoggingEvent> eventObjectList = new ArrayList<>();
                //获取队列中的全部数据
                eventBuffer.getBuffer().drainTo(eventObjectList);
                List<String> rowkeList=new ArrayList<>();
                List<Map<String,Map<String,String>>> familyList=new ArrayList<>();
                for(ILoggingEvent loggingEvent:eventObjectList){
                    rowkeList.add(RowkeyFactory.getRowkey(hbaseProperties));
                    Map<String,String> logMap = (Map<String, String>) JSON.parse(loggingEvent.getFormattedMessage());
                    Map<String,Map<String,String>> data =new HashMap<>();
                    data.put(hbaseProperties.getFamilyName(),logMap);
                    familyList.add(data);
                }
                hbaseFactory.addLogBulk(hbaseProperties.getTableName(),rowkeList,familyList);
                //线程休眠
                sleep();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void sleep() throws Exception {
        //线程休眠
        Thread.sleep(hbaseAppenderProperties.getBufferProperties().getSleepTime());
    }
}

+ 31 - 0
common-logback-starter/src/main/resources/bak_java/hbase/buffer/HBaseEventBuffer.java

@ -0,0 +1,31 @@
package com.yihu.base.hbase.buffer;
import ch.qos.logback.classic.spi.LoggingEvent;
import com.yihu.base.hbase.properties.BufferProperties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HBaseEventBuffer {
    //缓冲队列
    BlockingQueue queue = null;
    public HBaseEventBuffer(BufferProperties bufferProperties) {
        queue = new ArrayBlockingQueue(bufferProperties.getBufferSize());
    }
    public BlockingQueue getBuffer() {
        return queue;
    }
    public void setQueue(BlockingQueue queue) {
        this.queue = queue;
    }
    public void addLogEvent(LoggingEvent eventObject) {
        queue.add(eventObject);
    }
}

+ 158 - 0
common-logback-starter/src/main/resources/bak_java/hbase/config/HbaseFactory.java

@ -0,0 +1,158 @@
package com.yihu.base.hbase.config;
import com.yihu.base.hbase.properties.HbaseProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * Created by chenweida on 2018/2/27.
 */
public class HbaseFactory {
    private HbaseTemplate hbaseTemplate = new HbaseTemplate();
    private HbaseProperties hbaseProperties;
    private volatile Configuration configuration;
    public HbaseFactory(HbaseProperties hbaseProperties) {
        this.hbaseProperties = hbaseProperties;
    }
    /**
     * 批量新增行
     */
    public void addLogBulk(String tableName, List<String> rowkeyList, List<Map<String, Map<String, String>>> familyList) throws Exception {
        hbaseTemplate.execute(tableName, new TableCallback<String>() {
            @Override
            public String doInTable(HTableInterface table) throws Throwable {
                List<Put> list = new ArrayList<>();
                for (int i = 0; i < rowkeyList.size(); i++) {
                    Put p = new Put(rowkeyList.get(i).getBytes());
                    Map<String, Map<String, String>> family = familyList.get(i);
                    for (String familyName : family.keySet()) {
                        Map<String, String> map = family.get(familyName);
                        for (String qualifier : map.keySet()) {
                            String value = map.get(qualifier);
                            if (value == null) {
                                continue;
                            }
                            p.add(familyName.getBytes(), qualifier.getBytes(), value.getBytes());
                        }
                    }
                    list.add(p);
                }
                table.put(list);
                return null;
            }
        });
    }
    public void init() {
        Connection connection = null;
        HBaseAdmin hBaseAdmin = null;
        try {
            //获取链接
            connection = getConnection();
            hBaseAdmin = (HBaseAdmin) connection.getAdmin();
            //判断表名是否存在
            if (!hBaseAdmin.tableExists(hbaseProperties.getTableName())) {
                //创建表
                createTable(hbaseProperties);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (hBaseAdmin != null) {
                    hBaseAdmin.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 获取链接
     *
     * @return
     * @throws IOException
     */
    private Connection getConnection() throws IOException {
        if (configuration == null) {
            synchronized (HbaseFactory.class) {
                if (configuration == null) {
                    //设置hadoop账号
                    System.setProperty("HADOOP_USER_NAME", hbaseProperties.getHdfsUserName());
                    configuration = HBaseConfiguration.create();
                    configuration.set("hbase.zookeeper.quorum", hbaseProperties.getZkHosts());
                    if(StringUtils.isEmpty(hbaseProperties.getZkZnodeParent())){
                        hbaseProperties.setZkZnodeParent(HbaseProperties.default_zkZnodeParent);
                    }
                    configuration.set("zookeeper.znode.parent",hbaseProperties.getZkZnodeParent());
                    if(StringUtils.isEmpty(hbaseProperties.getZkPort())){
                        hbaseProperties.setZkPort(HbaseProperties.default_zkPort);
                    }
                    configuration.set("hbase.zookeeper.property.clientPort",hbaseProperties.getZkPort());
                    hbaseTemplate.setConfiguration(configuration);
                }
            }
        }
        return ConnectionFactory.createConnection(configuration);
    }
    /**
     * 创建表
     *
     * @param hbaseProperties
     * @throws Exception
     */
    private void createTable(HbaseProperties hbaseProperties) throws Exception {
        Connection connection = getConnection();
        HBaseAdmin hBaseAdmin = (HBaseAdmin) connection.getAdmin();
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(hbaseProperties.getTableName()));
        //最多建议1-3个列族
        for (String family : hbaseProperties.getFamilyNames()) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(family);
            hColumnDescriptor.setBlockCacheEnabled(true);//开始读内存缓存
            hColumnDescriptor.setInMemory(true);//是否加载到内存
            hColumnDescriptor.setMaxVersions(1);//版本数1
            hTableDescriptor.addFamily(hColumnDescriptor);
        }
        hBaseAdmin.createTable(hTableDescriptor);
        hBaseAdmin.close();
        connection.close();
    }
}

common-logback-starter/src/main/java/com.yihu.base/hbase/properties/BufferProperties.java → common-logback-starter/src/main/resources/bak_java/hbase/properties/BufferProperties.java


+ 28 - 0
common-logback-starter/src/main/resources/bak_java/hbase/properties/HbaseAppenderProperties.java

@ -0,0 +1,28 @@
package com.yihu.base.hbase.properties;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HbaseAppenderProperties {
    private HbaseProperties hbaseProperties=new HbaseProperties();
   //缓存区相关的配置
    private BufferProperties bufferProperties = new BufferProperties();
    public HbaseProperties getHbaseProperties() {
        return hbaseProperties;
    }
    public void setHbaseProperties(HbaseProperties hbaseProperties) {
        this.hbaseProperties = hbaseProperties;
    }
    public BufferProperties getBufferProperties() {
        return bufferProperties;
    }
    public void setBufferProperties(BufferProperties bufferProperties) {
        this.bufferProperties = bufferProperties;
    }
}

+ 92 - 0
common-logback-starter/src/main/resources/bak_java/hbase/properties/HbaseProperties.java

@ -0,0 +1,92 @@
package com.yihu.base.hbase.properties;
/**
 * Created by chenweida on 2018/2/24.
 * ES相关配置
 */
public class HbaseProperties {
    public static final String default_zkZnodeParent = "/hbase-unsecure";//zk上的路径
    public static final String default_zkPort = "2181";//zk上的路径
    public  static final String rowkeyGenerate_uuid="UUID";
    private String tableName; //表明
    private String familyName;//列族名称 多个逗号分隔
    private String zkHosts;//zookeeper路劲
    private String zkPort;//zookeeper路劲
    /**
     * 对于Hortonworks:  hconfig.set("zookeeper.znode.parent", "/hbase-unsecure")
     对于cloudera:  hconfig.set("zookeeper.znode.parent", "/hbase")
     */
    private String zkZnodeParent ;//zk上的路径
    private String hdfsUserName;//hdfs用户名称
    private String rowkey;//rowkey规则  UUID
    public String getTableName() {
        return tableName;
    }
    public void setTableName(String tableName) {
        this.tableName = tableName;
    }
    public String getFamilyName() {
        return familyName;
    }
    public String[] getFamilyNames() {
        return familyName.split(",");
    }
    public void setFamilyName(String familyName) {
        this.familyName = familyName;
    }
    public String getZkHosts() {
        return zkHosts;
    }
    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }
    public String getZkZnodeParent() {
        return zkZnodeParent;
    }
    public void setZkZnodeParent(String zkZnodeParent) {
        this.zkZnodeParent = zkZnodeParent;
    }
    public String getHdfsUserName() {
        return hdfsUserName;
    }
    public void setHdfsUserName(String hdfsUserName) {
        this.hdfsUserName = hdfsUserName;
    }
    public String getRowkey() {
        return rowkey;
    }
    public void setRowkey(String rowkey) {
        this.rowkey = rowkey;
    }
    public void setZkPort(String zkPort) {
        this.zkPort = zkPort;
    }
    public String getZkPort() {
        return zkPort;
    }
}

+ 9 - 0
common-logback-starter/src/main/resources/bak_java/hbase/rowkey/IRowkeyGenerate.java

@ -0,0 +1,9 @@
package com.yihu.base.hbase.rowkey;
/**
 * Created by chenweida on 2018/2/28.
 */
public interface IRowkeyGenerate {
    String getRowkey();
}

+ 48 - 0
common-logback-starter/src/main/resources/bak_java/hbase/rowkey/RowkeyFactory.java

@ -0,0 +1,48 @@
package com.yihu.base.hbase.rowkey;
import com.yihu.base.hbase.properties.HbaseProperties;
import com.yihu.base.hbase.rowkey.impl.UUIDRowkeyGenerate;
import org.apache.commons.lang3.StringUtils;
/**
 * Created by chenweida on 2018/2/28.
 */
public class RowkeyFactory {
    private volatile static IRowkeyGenerate rowkeyGenerate = null;
    private RowkeyFactory() {
    }
    public static String getRowkey(HbaseProperties hbaseProperties) {
        //初始化rowkey生成器
        if (rowkeyGenerate == null) {
            synchronized (RowkeyFactory.class) {
                if (rowkeyGenerate == null) {
                    initIRowkeyGenerate(hbaseProperties);
                }
            }
        }
        return rowkeyGenerate.getRowkey();
    }
    private static void initIRowkeyGenerate(HbaseProperties hbaseProperties) {
        //如果为空默认uuid
        if (StringUtils.isNoneEmpty(hbaseProperties.getRowkey())) {
            rowkeyGenerate = new UUIDRowkeyGenerate();
        } else {
            switch (hbaseProperties.getRowkey()) {
                case HbaseProperties.rowkeyGenerate_uuid: {
                    rowkeyGenerate = new UUIDRowkeyGenerate();
                    return;
                }
                default: {
                    rowkeyGenerate = new UUIDRowkeyGenerate();
                    return;
                }
            }
        }
    }
}

+ 15 - 0
common-logback-starter/src/main/resources/bak_java/hbase/rowkey/impl/UUIDRowkeyGenerate.java

@ -0,0 +1,15 @@
package com.yihu.base.hbase.rowkey.impl;
import com.yihu.base.hbase.rowkey.IRowkeyGenerate;
import java.util.UUID;
/**
 * Created by chenweida on 2018/2/28.
 */
public class UUIDRowkeyGenerate implements IRowkeyGenerate {
    @Override
    public String getRowkey() {
        return UUID.randomUUID().toString().replace("-","");
    }
}

+ 12 - 0
common-logback-starter/src/main/resources/bak_java/hbase/timer/MajorTimer.java

@ -0,0 +1,12 @@
package com.yihu.base.hbase.timer;
import java.util.Timer;
/**
 * Created by chenweida on 2018/2/27.
 * 优化hbase效率的时候可以手动关闭major
 * hbase.hregion.majorcompaction
 * 然后自己定时在凌晨的时候去合并,因为在合并的时候无法操作文件
 */
public class MajorTimer extends Timer {
}

+ 74 - 0
common-logback-starter/src/main/resources/bak_java/hdfs/HDFSAppender.java

@ -0,0 +1,74 @@
package bak.hdfs;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import bak.hdfs.buffer.HDFSBufferConsumer;
import bak.hdfs.buffer.HDFSEventBuffer;
import bak.hdfs.properties.HDFSAppenderProperties;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSAppender extends AppenderBase<LoggingEvent> {
    private HDFSAppenderProperties hdfsAppenderProperties = new HDFSAppenderProperties();
    private HDFSEventBuffer eventBuffer;
    //消费者
    private Thread bufferConsumerThread = null;
    @Override
    public void start() {
        super.start();
        //初始化内存缓冲队列
        eventBuffer = new HDFSEventBuffer(hdfsAppenderProperties.getBufferProperties());
        //启动消费者
        HDFSBufferConsumer bufferConsumer = new HDFSBufferConsumer(eventBuffer, hdfsAppenderProperties);
        bufferConsumerThread = new Thread(bufferConsumer);
        bufferConsumerThread.start();
    }
    @Override
    protected void append(LoggingEvent eventObject) {
    }
    //========================================properties========================================
    //==============hdfs start==============
    public void setHosts(String hosts) {
        hdfsAppenderProperties.getHdfsProperties().setHosts(hosts);
    }
    public void setPath(String path) {
        hdfsAppenderProperties.getHdfsProperties().setPath(path);
    }
    public void setFileName(String fileName) {
        hdfsAppenderProperties.getHdfsProperties().setFileName(fileName);
    }
    public void setRolling(String rolling) {
        hdfsAppenderProperties.getHdfsProperties().setRolling(rolling);
    }
    public void setSuffix(String suffix) {
        hdfsAppenderProperties.getHdfsProperties().setSuffix(suffix);
    }
    //==============hdfs end==============
    //==============buffer start==============
    public void setSleepTime(Long sleepTime) {
        hdfsAppenderProperties.getBufferProperties().setSleepTime(sleepTime);
    }
    public void setBufferSize(Integer bufferSize) {
        hdfsAppenderProperties.getBufferProperties().setBufferSize(bufferSize);
    }
    //==============buffer end==============
}

+ 55 - 0
common-logback-starter/src/main/resources/bak_java/hdfs/buffer/HDFSBufferConsumer.java

@ -0,0 +1,55 @@
package bak.hdfs.buffer;
import ch.qos.logback.classic.spi.ILoggingEvent;
import bak.hdfs.properties.HDFSAppenderProperties;
import java.util.ArrayList;
import java.util.List;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HDFSBufferConsumer implements Runnable {
    //缓冲区
    private HDFSEventBuffer eventBuffer;
    //消费者相关配政治
    private HDFSAppenderProperties hdfsAppenderProperties;
    public HDFSBufferConsumer(HDFSEventBuffer eventBuffer, HDFSAppenderProperties hdfsAppenderProperties) {
        this.eventBuffer = eventBuffer;
        this.hdfsAppenderProperties = hdfsAppenderProperties;
    }
    @Override
    public void run() {
        while (true) {
            try {
                //如果队列没数据休眠
                if (eventBuffer.getBuffer().size() == 0) {
                    sleep();
                    continue;
                }
                List<ILoggingEvent> eventObjectList = new ArrayList<>();
                //获取队列中的全部数据
                eventBuffer.getBuffer().drainTo(eventObjectList);
                //判断hdfs中是否存在这个文件
                //没有就创建hdfs文件并且发送内容
                //有的话就完文件里面追加内容即可
                //线程休眠
                sleep();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void sleep() throws Exception {
        //线程休眠
        Thread.sleep(hdfsAppenderProperties.getBufferProperties().getSleepTime());
    }
}

+ 31 - 0
common-logback-starter/src/main/resources/bak_java/hdfs/buffer/HDFSEventBuffer.java

@ -0,0 +1,31 @@
package bak.hdfs.buffer;
import ch.qos.logback.classic.spi.LoggingEvent;
import bak.hdfs.properties.BufferProperties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HDFSEventBuffer {
    //缓冲队列
    BlockingQueue queue = null;
    public HDFSEventBuffer(BufferProperties bufferProperties) {
        queue = new ArrayBlockingQueue(bufferProperties.getBufferSize());
    }
    public BlockingQueue getBuffer() {
        return queue;
    }
    public void setQueue(BlockingQueue queue) {
        this.queue = queue;
    }
    public void addLogEvent(LoggingEvent eventObject) {
        queue.add(eventObject);
    }
}

+ 27 - 0
common-logback-starter/src/main/resources/bak_java/hdfs/properties/BufferProperties.java

@ -0,0 +1,27 @@
package bak.hdfs.properties;
/**
 * Created by chenweida on 2018/2/24.
 * 缓冲区相关配置
 */
public class BufferProperties {
    private Long sleepTime = 1000L;//多久消费一次消息
    private Integer bufferSize = 100000;//缓冲区的大小
    public Long getSleepTime() {
        return sleepTime;
    }
    public void setSleepTime(Long sleepTime) {
        this.sleepTime = sleepTime;
    }
    public Integer getBufferSize() {
        return bufferSize;
    }
    public void setBufferSize(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }
}

+ 27 - 0
common-logback-starter/src/main/resources/bak_java/hdfs/properties/HDFSAppenderProperties.java

@ -0,0 +1,27 @@
package bak.hdfs.properties;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSAppenderProperties {
    private HDFSProperties hdfsProperties=new HDFSProperties();
    private BufferProperties bufferProperties=new BufferProperties();
    public HDFSProperties getHdfsProperties() {
        return hdfsProperties;
    }
    public void setHdfsProperties(HDFSProperties hdfsProperties) {
        this.hdfsProperties = hdfsProperties;
    }
    public BufferProperties getBufferProperties() {
        return bufferProperties;
    }
    public void setBufferProperties(BufferProperties bufferProperties) {
        this.bufferProperties = bufferProperties;
    }
}

+ 55 - 0
common-logback-starter/src/main/resources/bak_java/hdfs/properties/HDFSProperties.java

@ -0,0 +1,55 @@
package bak.hdfs.properties;
import com.yihu.base.common.RollingUtil;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSProperties {
    private String hosts;//hdfs://192.168.131.240:9000/
    private String path;//日志的存储路径
    private String fileName;//日志的名称
    private String rolling;//日志按照什么滚动  day week month year
    private String suffix = "log";//后缀
    public String getHosts() {
        return hosts;
    }
    public void setHosts(String hosts) {
        this.hosts = hosts;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public String getFileName() {
        return RollingUtil.getRollingAppendFirst(fileName, rolling, suffix);
    }
    public void setFileName(String fileName) {
        this.fileName = fileName;
    }
    public String getRolling() {
        return rolling;
    }
    public void setRolling(String rolling) {
        this.rolling = rolling;
    }
    public String getSuffix() {
        return suffix;
    }
    public void setSuffix(String suffix) {
        this.suffix = suffix;
    }
}

+ 102 - 0
common-logback-starter/src/main/resources/bak_java/hdfs/util/HDFSUtil.java

@ -0,0 +1,102 @@
package bak.hdfs.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.OutputStream;
import java.net.URI;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSUtil {
    private static Logger logger = LoggerFactory.getLogger(HDFSUtil.class);
    private static String uri = "hdfs://192.168.131.240:9000/";
    /**
     * @param uri     hdfs地址
     * @param message 追加的内容
     * @param path    文件路径
     *                <p/>
     *                <p/>
     *                追加文件内容
     *                上面的解释明显的提到如果需要使用append操作,需要升级到hadoop 2.x版本。并且需要在Conf的hdfs.site.xml文件中加入如下配置:
     *                <p/>
     *                <property>
     *                <name>dfs.support.append</name>
     *                <value>true</value>
     *                </property>
     *                Hadoop的API中也提供了设置项来支持内容追加,代码如下:
     *                <p/>
     *                Configuration conf = new Configuration();
     *                conf.setBoolean("dfs.support.append", true);
     *                https://www.cnblogs.com/flyeast/archive/2014/07/20/3856356.html
     */
    public static void appendFile(String uri, String path, String message) {
        try {
            Configuration conf = new Configuration();
            conf.setBoolean("dfs.support.append", true);//开启文件追加模式
            FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
            if (exsit(fileSystem, uri, path)) {
            } else {
                //如果不存在就创建文件
                fileSystem.create(new Path(path));
            }
            //直接追加
            append(fileSystem, uri, message, path);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
    /**
     * @param uri     hdfs地址
     * @param pathStr 文件路径
     *                判断文件是否存在
     */
    private static boolean exsit(FileSystem fileSystem, String uri, String pathStr)
            throws Exception {
        try {
            Path path = new Path(pathStr);
            return fileSystem.exists(path);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 追加文件
     *
     * @param fileSystem
     * @param uri
     * @param message
     * @param path
     * @throws Exception
     */
    private static void append(FileSystem fileSystem, String uri, String message, String path)
            throws Exception {
        //如果存在就直接追加
        OutputStream out = fileSystem.append(new Path(path));
        out.write((message + "\r\n").getBytes("UTF-8"));
        out.flush();
        out.close();
    }
    public static void main(String[] args) throws Exception {
        while (true) {
            String uri = "hdfs://172.17.110.20:8020/";
            String message = "ceshi";
            String path = "/user/root/ceshi123.log";
            HDFSUtil.appendFile(uri, path, message);
        }
    }
}