3.9 HDFS 的 API
3.9.1 访问接口
org.apache.hadoop.fs. FileSystem
是一个通用的文件系统 API,提供了不同文件系统的统一访问方式
org.apache.hadoop.fs. Path
Hadoop 文件系统中统一的文件或目录描述,类似于 java.io.File 对本地文件系统的文件或目录描述。
org.apache, hadoop.conf.Configuration
> 读取、解析配置文件(如core-site.xml/hdfs-default.xml/hdfs-site.xml等),或添加配置的工具类
org.apache, hadoop.fs.FSDataOutputStream
hadoop 中数据输出流的统一封装
org.apache, hadoop.fs.FSDatalnputStream
hadoop 中数据输入流的统一封装
3.9.2 FileSystem
项目引入依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
第一种获取方式
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://172.16.0.4:9000");
FileSystem fs=FileSystem.get(conf);
2.第二种获取方式
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://172.16.0.4:9000"),conf);
第三种方式
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://172.16.0.4:9000");
FileSystem fs=FileSystem.newInstance(conf);
第四种方式
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
针对hdfs文件系统操作过程中,出现用户权限不允许,修改配置文件hdfs-site.xml并重启集群,添加内容如下:
<property>
<name>dfs.permissions.enabled</name>
<value>true</value>
</property>
3.9.3 读取文件
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
Path p=new Path("/user/huzl/a.txt");
FSDataInputStream fsi= fs.open(p);
BufferedReader br=new BufferedReader(new InputStreamReader(fsi));
String str="";
while((str=br.readLine())!=null){
System.out.println(str);
}
3.9.4 写入数据
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
Path p=new Path("/user/huzl/a.txt");
FSDataOutputStream fos=fs.create(p);
PrintWriter pw=new PrintWriter(fos);
pw.println("helloworld");
pw.println("test briup");
pw.println("hadoop java");
pw.println("xml mysql");
pw.flush();
pw.close();
3.9.5 创建目录
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
Path p=new Path("/user/huzl/dir1");
fs.mkdirs(p);
3.9.6 删除目录或文件
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
Path p=new Path("/user/huzl/dir1");
fs.delete(p,true);
3.9.7 列出块信息
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
Path p=new Path("/user/huzl/a.txt");
HdfsDataInputStream fis=(HdfsDataInputStream)fs.open(p);
List<LocatedBlock> list=fis.getAllBlocks();
for(LocatedBlock li:list){
DatanodeInfo[] ds=li.getLocations();
System.out.println(Arrays.toString(ds));
System.out.println(li.getBlock().getBlockId());
System.out.println(li.getBlock().getBlockName());
System.out.println(li.getBlock().getNumBytes());
System.out.println("*****");
}
3.9.8 列出元数据
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
Path p=new Path("/user/huzl/a.txt");
FileStatus[] stats=fs.listStatus(p);
for(FileStatus f:stats){
System.out.println(f.getAccessTime());
System.out.println(f.getBlockSize());
System.out.println(f.getGroup());
System.out.println(f.getOwner());
System.out.println(f.getPath());
System.out.println(f.getPermission());
}
3.9.9 遍历文件
Configuration conf=new Configuration();
FileSystem fs=FileSystem.newInstance(new URI("hdfs://172.16.0.4:9000"),conf);
Path p=new Path("/");
RemoteIterator<LocatedFileStatus> rs=fs.listFiles(p,true);
while(rs.hasNext()){
LocatedFileStatus lf=rs.next();
System.out.println(lf.getPath().toString());
}
3.9.10 合并小文件
由于 Hadoop 擅长存储大文件,因为大文件的元数据信息比较少,如果 Hadoop 集群当中有大量的小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理,可以通过命令行将很多的 hdfs 文件合并成一个大文件下载到本地
hdfs dfs -getmerge *.txt a.txt
既然可以在下载的时候将这些小文件合并成一个大文件一起下载,那么肯定就可以在下载的时候将小文件合并到一个大文件里面去,
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://172.16.0.4:9000");
Path p=new Path("/user/huzl");
FileSystem fs=FileSystem.get(conf);
System.out.println(fs.getClass());
FileStatus[] files=fs.listStatus(p);
FileOutputStream fos=new FileOutputStream("/Users/huzhongliang/Desktop/video/briup.txt",true);
for(FileStatus fst:files){
FSDataInputStream fis=fs.open(fst.getPath());
IOUtils.copy(fis,fos);
IOUtils.closeQuietly(fis);
}
IOUtils.closeQuietly(fos);
fs.close();
3.9.11 文件上传
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://172.16.0.4:9000");
Path p=new Path("/user/huzl/briup.txt");
FileSystem fs=FileSystem.get(conf);
fs.copyFromLocalFile(new Path("/Users/huzhongliang/Desktop/video/a.txt"),p);