HDFSUtil.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package com.yihu.base.hdfs.util;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.io.OutputStream;
  8. import java.net.URI;
  9. /**
  10. * Created by chenweida on 2018/2/26.
  11. */
  12. public class HDFSUtil {
  13. private static Logger logger = LoggerFactory.getLogger(HDFSUtil.class);
  14. private static String uri = "hdfs://192.168.131.240:9000/";
  15. /**
  16. * @param uri hdfs地址
  17. * @param message 追加的内容
  18. * @param path 文件路径
  19. * <p/>
  20. * <p/>
  21. * 追加文件内容
  22. * 上面的解释明显的提到如果需要使用append操作,需要升级到hadoop 2.x版本。并且需要在Conf的hdfs.site.xml文件中加入如下配置:
  23. * <p/>
  24. * <property>
  25. * <name>dfs.support.append</name>
  26. * <value>true</value>
  27. * </property>
  28. * Hadoop的API中也提供了设置项来支持内容追加,代码如下:
  29. * <p/>
  30. * Configuration conf = new Configuration();
  31. * conf.setBoolean("dfs.support.append", true);
  32. * https://www.cnblogs.com/flyeast/archive/2014/07/20/3856356.html
  33. */
  34. public static void appendFile(String uri, String path, String message) {
  35. try {
  36. Configuration conf = new Configuration();
  37. conf.setBoolean("dfs.support.append", true);//开启文件追加模式
  38. FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
  39. if (exsit(fileSystem, uri, path)) {
  40. } else {
  41. //如果不存在就创建文件
  42. fileSystem.create(new Path(path));
  43. }
  44. //直接追加
  45. append(fileSystem, uri, message, path);
  46. } catch (Exception e) {
  47. logger.error(e.getMessage());
  48. }
  49. }
  50. /**
  51. * @param uri hdfs地址
  52. * @param pathStr 文件路径
  53. * 判断文件是否存在
  54. */
  55. private static boolean exsit(FileSystem fileSystem, String uri, String pathStr)
  56. throws Exception {
  57. try {
  58. Path path = new Path(pathStr);
  59. return fileSystem.exists(path);
  60. } catch (Exception e) {
  61. e.printStackTrace();
  62. return false;
  63. }
  64. }
  65. /**
  66. * 追加文件
  67. *
  68. * @param fileSystem
  69. * @param uri
  70. * @param message
  71. * @param path
  72. * @throws Exception
  73. */
  74. private static void append(FileSystem fileSystem, String uri, String message, String path)
  75. throws Exception {
  76. //如果存在就直接追加
  77. OutputStream out = fileSystem.append(new Path(path));
  78. out.write((message + "\r\n").getBytes("UTF-8"));
  79. out.flush();
  80. out.close();
  81. }
  82. public static void main(String[] args) throws Exception {
  83. while (true) {
  84. String uri = "hdfs://172.17.110.20:8020/";
  85. String message = "ceshi";
  86. String path = "/user/root/ceshi123.log";
  87. HDFSUtil.appendFile(uri, path, message);
  88. }
  89. }
  90. }