chenweida 6 vuotta sitten
vanhempi
commit
c88ed3d2aa

+ 0 - 28
common-logback-starter/src/main/java/com.yihu.base/hbase/properties/HbaseAppenderProperties.java

@ -1,28 +0,0 @@
package com.yihu.base.hbase.properties;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HbaseAppenderProperties {
    private HbaseProperties hbaseProperties=new HbaseProperties();
   //缓存区相关的配置
    private BufferProperties bufferProperties = new BufferProperties();
    public HbaseProperties getHbaseProperties() {
        return hbaseProperties;
    }
    public void setHbaseProperties(HbaseProperties hbaseProperties) {
        this.hbaseProperties = hbaseProperties;
    }
    public BufferProperties getBufferProperties() {
        return bufferProperties;
    }
    public void setBufferProperties(BufferProperties bufferProperties) {
        this.bufferProperties = bufferProperties;
    }
}

+ 0 - 92
common-logback-starter/src/main/java/com.yihu.base/hbase/properties/HbaseProperties.java

@ -1,92 +0,0 @@
package com.yihu.base.hbase.properties;
/**
 * Created by chenweida on 2018/2/24.
 * ES相关配置
 */
public class HbaseProperties {
    public static final String default_zkZnodeParent = "/hbase-unsecure";//zk上的路径
    public static final String default_zkPort = "2181";//zk上的路径
    public  static final String rowkeyGenerate_uuid="UUID";
    private String tableName; //表明
    private String familyName;//列族名称 多个逗号分隔
    private String zkHosts;//zookeeper路劲
    private String zkPort;//zookeeper路劲
    /**
     * 对于Hortonworks:  hconfig.set("zookeeper.znode.parent", "/hbase-unsecure")
     对于cloudera:  hconfig.set("zookeeper.znode.parent", "/hbase")
     */
    private String zkZnodeParent ;//zk上的路径
    private String hdfsUserName;//hdfs用户名称
    private String rowkey;//rowkey规则  UUID
    public String getTableName() {
        return tableName;
    }
    public void setTableName(String tableName) {
        this.tableName = tableName;
    }
    public String getFamilyName() {
        return familyName;
    }
    public String[] getFamilyNames() {
        return familyName.split(",");
    }
    public void setFamilyName(String familyName) {
        this.familyName = familyName;
    }
    public String getZkHosts() {
        return zkHosts;
    }
    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }
    public String getZkZnodeParent() {
        return zkZnodeParent;
    }
    public void setZkZnodeParent(String zkZnodeParent) {
        this.zkZnodeParent = zkZnodeParent;
    }
    public String getHdfsUserName() {
        return hdfsUserName;
    }
    public void setHdfsUserName(String hdfsUserName) {
        this.hdfsUserName = hdfsUserName;
    }
    public String getRowkey() {
        return rowkey;
    }
    public void setRowkey(String rowkey) {
        this.rowkey = rowkey;
    }
    public void setZkPort(String zkPort) {
        this.zkPort = zkPort;
    }
    public String getZkPort() {
        return zkPort;
    }
}

+ 0 - 9
common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/IRowkeyGenerate.java

@ -1,9 +0,0 @@
package com.yihu.base.hbase.rowkey;
/**
 * Created by chenweida on 2018/2/28.
 */
public interface IRowkeyGenerate {
    String getRowkey();
}

+ 0 - 48
common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/RowkeyFactory.java

@ -1,48 +0,0 @@
package com.yihu.base.hbase.rowkey;
import com.yihu.base.hbase.properties.HbaseProperties;
import com.yihu.base.hbase.rowkey.impl.UUIDRowkeyGenerate;
import org.apache.commons.lang3.StringUtils;
/**
 * Created by chenweida on 2018/2/28.
 */
public class RowkeyFactory {
    private volatile static IRowkeyGenerate rowkeyGenerate = null;
    private RowkeyFactory() {
    }
    public static String getRowkey(HbaseProperties hbaseProperties) {
        //初始化rowkey生成器
        if (rowkeyGenerate == null) {
            synchronized (RowkeyFactory.class) {
                if (rowkeyGenerate == null) {
                    initIRowkeyGenerate(hbaseProperties);
                }
            }
        }
        return rowkeyGenerate.getRowkey();
    }
    private static void initIRowkeyGenerate(HbaseProperties hbaseProperties) {
        //如果为空默认uuid
        if (StringUtils.isNoneEmpty(hbaseProperties.getRowkey())) {
            rowkeyGenerate = new UUIDRowkeyGenerate();
        } else {
            switch (hbaseProperties.getRowkey()) {
                case HbaseProperties.rowkeyGenerate_uuid: {
                    rowkeyGenerate = new UUIDRowkeyGenerate();
                    return;
                }
                default: {
                    rowkeyGenerate = new UUIDRowkeyGenerate();
                    return;
                }
            }
        }
    }
}

+ 0 - 15
common-logback-starter/src/main/java/com.yihu.base/hbase/rowkey/impl/UUIDRowkeyGenerate.java

@ -1,15 +0,0 @@
package com.yihu.base.hbase.rowkey.impl;
import com.yihu.base.hbase.rowkey.IRowkeyGenerate;
import java.util.UUID;
/**
 * Created by chenweida on 2018/2/28.
 */
public class UUIDRowkeyGenerate implements IRowkeyGenerate {
    @Override
    public String getRowkey() {
        return UUID.randomUUID().toString().replace("-","");
    }
}

+ 0 - 12
common-logback-starter/src/main/java/com.yihu.base/hbase/timer/MajorTimer.java

@ -1,12 +0,0 @@
package com.yihu.base.hbase.timer;
import java.util.Timer;
/**
 * Created by chenweida on 2018/2/27.
 * 优化hbase效率的时候可以手动关闭major
 * hbase.hregion.majorcompaction
 * 然后自己定时在凌晨的时候去合并,因为在合并的时候无法操作文件
 */
public class MajorTimer extends Timer {
}

+ 0 - 74
common-logback-starter/src/main/java/com.yihu.base/hdfs/HDFSAppender.java

@ -1,74 +0,0 @@
package com.yihu.base.hdfs;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.yihu.base.hdfs.buffer.HDFSBufferConsumer;
import com.yihu.base.hdfs.buffer.HDFSEventBuffer;
import com.yihu.base.hdfs.properties.HDFSAppenderProperties;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSAppender extends AppenderBase<LoggingEvent> {
    private HDFSAppenderProperties hdfsAppenderProperties = new HDFSAppenderProperties();
    private HDFSEventBuffer eventBuffer;
    //消费者
    private Thread bufferConsumerThread = null;
    @Override
    public void start() {
        super.start();
        //初始化内存缓冲队列
        eventBuffer = new HDFSEventBuffer(hdfsAppenderProperties.getBufferProperties());
        //启动消费者
        HDFSBufferConsumer bufferConsumer = new HDFSBufferConsumer(eventBuffer, hdfsAppenderProperties);
        bufferConsumerThread = new Thread(bufferConsumer);
        bufferConsumerThread.start();
    }
    @Override
    protected void append(LoggingEvent eventObject) {
    }
    //========================================properties========================================
    //==============hdfs start==============
    public void setHosts(String hosts) {
        hdfsAppenderProperties.getHdfsProperties().setHosts(hosts);
    }
    public void setPath(String path) {
        hdfsAppenderProperties.getHdfsProperties().setPath(path);
    }
    public void setFileName(String fileName) {
        hdfsAppenderProperties.getHdfsProperties().setFileName(fileName);
    }
    public void setRolling(String rolling) {
        hdfsAppenderProperties.getHdfsProperties().setRolling(rolling);
    }
    public void setSuffix(String suffix) {
        hdfsAppenderProperties.getHdfsProperties().setSuffix(suffix);
    }
    //==============hdfs end==============
    //==============buffer start==============
    public void setSleepTime(Long sleepTime) {
        hdfsAppenderProperties.getBufferProperties().setSleepTime(sleepTime);
    }
    public void setBufferSize(Integer bufferSize) {
        hdfsAppenderProperties.getBufferProperties().setBufferSize(bufferSize);
    }
    //==============buffer end==============
}

+ 0 - 55
common-logback-starter/src/main/java/com.yihu.base/hdfs/buffer/HDFSBufferConsumer.java

@ -1,55 +0,0 @@
package com.yihu.base.hdfs.buffer;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.yihu.base.hdfs.properties.HDFSAppenderProperties;
import java.util.ArrayList;
import java.util.List;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HDFSBufferConsumer implements Runnable {
    //缓冲区
    private HDFSEventBuffer eventBuffer;
    //消费者相关配政治
    private HDFSAppenderProperties hdfsAppenderProperties;
    public HDFSBufferConsumer(HDFSEventBuffer eventBuffer, HDFSAppenderProperties hdfsAppenderProperties) {
        this.eventBuffer = eventBuffer;
        this.hdfsAppenderProperties = hdfsAppenderProperties;
    }
    @Override
    public void run() {
        while (true) {
            try {
                //如果队列没数据休眠
                if (eventBuffer.getBuffer().size() == 0) {
                    sleep();
                    continue;
                }
                List<ILoggingEvent> eventObjectList = new ArrayList<>();
                //获取队列中的全部数据
                eventBuffer.getBuffer().drainTo(eventObjectList);
                //判断hdfs中是否存在这个文件
                //没有就创建hdfs文件并且发送内容
                //有的话就完文件里面追加内容即可
                //线程休眠
                sleep();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void sleep() throws Exception {
        //线程休眠
        Thread.sleep(hdfsAppenderProperties.getBufferProperties().getSleepTime());
    }
}

+ 0 - 31
common-logback-starter/src/main/java/com.yihu.base/hdfs/buffer/HDFSEventBuffer.java

@ -1,31 +0,0 @@
package com.yihu.base.hdfs.buffer;
import ch.qos.logback.classic.spi.LoggingEvent;
import com.yihu.base.hdfs.properties.BufferProperties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * Created by chenweida on 2018/2/24.
 */
public class HDFSEventBuffer {
    //缓冲队列
    BlockingQueue queue = null;
    public HDFSEventBuffer(BufferProperties bufferProperties) {
        queue = new ArrayBlockingQueue(bufferProperties.getBufferSize());
    }
    public BlockingQueue getBuffer() {
        return queue;
    }
    public void setQueue(BlockingQueue queue) {
        this.queue = queue;
    }
    public void addLogEvent(LoggingEvent eventObject) {
        queue.add(eventObject);
    }
}

+ 0 - 27
common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/BufferProperties.java

@ -1,27 +0,0 @@
package com.yihu.base.hdfs.properties;
/**
 * Created by chenweida on 2018/2/24.
 * 缓冲区相关配置
 */
public class BufferProperties {
    private Long sleepTime = 1000L;//多久消费一次消息
    private Integer bufferSize = 100000;//缓冲区的大小
    public Long getSleepTime() {
        return sleepTime;
    }
    public void setSleepTime(Long sleepTime) {
        this.sleepTime = sleepTime;
    }
    public Integer getBufferSize() {
        return bufferSize;
    }
    public void setBufferSize(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }
}

+ 0 - 27
common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/HDFSAppenderProperties.java

@ -1,27 +0,0 @@
package com.yihu.base.hdfs.properties;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSAppenderProperties {
    private HDFSProperties hdfsProperties=new HDFSProperties();
    private BufferProperties bufferProperties=new BufferProperties();
    public HDFSProperties getHdfsProperties() {
        return hdfsProperties;
    }
    public void setHdfsProperties(HDFSProperties hdfsProperties) {
        this.hdfsProperties = hdfsProperties;
    }
    public BufferProperties getBufferProperties() {
        return bufferProperties;
    }
    public void setBufferProperties(BufferProperties bufferProperties) {
        this.bufferProperties = bufferProperties;
    }
}

+ 0 - 55
common-logback-starter/src/main/java/com.yihu.base/hdfs/properties/HDFSProperties.java

@ -1,55 +0,0 @@
package com.yihu.base.hdfs.properties;
import com.yihu.base.common.RollingUtil;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSProperties {
    private String hosts;//hdfs://192.168.131.240:9000/
    private String path;//日志的存储路径
    private String fileName;//日志的名称
    private String rolling;//日志按照什么滚动  day week month year
    private String suffix = "log";//后缀
    public String getHosts() {
        return hosts;
    }
    public void setHosts(String hosts) {
        this.hosts = hosts;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public String getFileName() {
        return RollingUtil.getRollingAppendFirst(fileName, rolling, suffix);
    }
    public void setFileName(String fileName) {
        this.fileName = fileName;
    }
    public String getRolling() {
        return rolling;
    }
    public void setRolling(String rolling) {
        this.rolling = rolling;
    }
    public String getSuffix() {
        return suffix;
    }
    public void setSuffix(String suffix) {
        this.suffix = suffix;
    }
}

+ 0 - 102
common-logback-starter/src/main/java/com.yihu.base/hdfs/util/HDFSUtil.java

@ -1,102 +0,0 @@
package com.yihu.base.hdfs.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.OutputStream;
import java.net.URI;
/**
 * Created by chenweida on 2018/2/26.
 */
public class HDFSUtil {
    private static Logger logger = LoggerFactory.getLogger(HDFSUtil.class);
    private static String uri = "hdfs://192.168.131.240:9000/";
    /**
     * @param uri     hdfs地址
     * @param message 追加的内容
     * @param path    文件路径
     *                <p/>
     *                <p/>
     *                追加文件内容
     *                上面的解释明显的提到如果需要使用append操作,需要升级到hadoop 2.x版本。并且需要在Conf的hdfs.site.xml文件中加入如下配置:
     *                <p/>
     *                <property>
     *                <name>dfs.support.append</name>
     *                <value>true</value>
     *                </property>
     *                Hadoop的API中也提供了设置项来支持内容追加,代码如下:
     *                <p/>
     *                Configuration conf = new Configuration();
     *                conf.setBoolean("dfs.support.append", true);
     *                https://www.cnblogs.com/flyeast/archive/2014/07/20/3856356.html
     */
    public static void appendFile(String uri, String path, String message) {
        try {
            Configuration conf = new Configuration();
            conf.setBoolean("dfs.support.append", true);//开启文件追加模式
            FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
            if (exsit(fileSystem, uri, path)) {
            } else {
                //如果不存在就创建文件
                fileSystem.create(new Path(path));
            }
            //直接追加
            append(fileSystem, uri, message, path);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
    /**
     * @param uri     hdfs地址
     * @param pathStr 文件路径
     *                判断文件是否存在
     */
    private static boolean exsit(FileSystem fileSystem, String uri, String pathStr)
            throws Exception {
        try {
            Path path = new Path(pathStr);
            return fileSystem.exists(path);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 追加文件
     *
     * @param fileSystem
     * @param uri
     * @param message
     * @param path
     * @throws Exception
     */
    private static void append(FileSystem fileSystem, String uri, String message, String path)
            throws Exception {
        //如果存在就直接追加
        OutputStream out = fileSystem.append(new Path(path));
        out.write((message + "\r\n").getBytes("UTF-8"));
        out.flush();
        out.close();
    }
    public static void main(String[] args) throws Exception {
        while (true) {
            String uri = "hdfs://172.17.110.20:8020/";
            String message = "ceshi";
            String path = "/user/root/ceshi123.log";
            HDFSUtil.appendFile(uri, path, message);
        }
    }
}