这篇文章主要介绍了Java API操作Hdfs详细示例,遍历当前目录下所有文件与文件夹,可以使用listStatus方法实现上述需求,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
1.遍历当前目录下所有文件与文件夹
可以使用listStatus方法实现上述需求。
listStatus方法签名如下
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist;
* IOException see specific implementation
*/
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException;
可以看出listStatus只需要传入参数Path即可,返回的是一个FileStatus的数组。
而FileStatus包含有以下信息
/** Interface that represents the client side information for a file.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileStatus implements Writable, Comparable {
private Path path;
private long length;
private boolean isdir;
private short block_replication;
private long blocksize;
private long modification_time;
private long access_time;
private FsPermission permission;
private String owner;
private String group;
private Path symlink;
....
从FileStatus中不难看出,包含有文件路径,大小,是否是目录,block_replication, blocksize…等等各种信息。
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
object HdfsOperation {
val logger = LoggerFactory.getLogger(this.getClass)
def tree(sc: SparkContext, path: String) : Unit = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val fsPath = new Path(path)
val status = fs.listStatus(fsPath)
for(filestatus:FileStatus <- status) {
logger.error("getPermission is: {}", filestatus.getPermission)
logger.error("getOwner is: {}", filestatus.getOwner)
logger.error("getGroup is: {}", filestatus.getGroup)
logger.error("getLen is: {}", filestatus.getLen)
logger.error("getModificationTime is: {}", filestatus.getModificationTime)
logger.error("getReplication is: {}", filestatus.getReplication)
logger.error("getBlockSize is: {}", filestatus.getBlockSize)
if (filestatus.isDirectory) {
val dirpath = filestatus.getPath.toString
logger.error("文件夹名字为: {}", dirpath)
tree(sc, dirpath)
} else {
val fullname = filestatus.getPath.toString
val filename = filestatus.getPath.getName
logger.error("全部文件名为: {}", fullname)
logger.error("文件名为: {}", filename)
}
}
}
}
如果判断fileStatus是文件夹,则递归调用tree方法,达到全部遍历的目的。
2.遍历所有文件
上面的方法是遍历所有文件以及文件夹。如果只想遍历文件,可以使用listFiles方法。
def findFiles(sc: SparkContext, path: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val fsPath = new Path(path)
val files = fs.listFiles(fsPath, true)
while(files.hasNext) {
val filestatus = files.next()
val fullname = filestatus.getPath.toString
val filename = filestatus.getPath.getName
logger.error("全部文件名为: {}", fullname)
logger.error("文件名为: {}", filename)
logger.error("文件大小为: {}", filestatus.getLen)
}
}
/**
* List the statuses and block locations of the files in the given path.
*
* If the path is a directory,
* if recursive is false, returns files in the directory;
* if recursive is true, return files in the subtree rooted at the path.
* If the path is a file, return the file's status and block locations.
*
* @param f is the path
* @param recursive if the subdirectories need to be traversed recursively
*
* @return an iterator that traverses statuses of the files
*
* @throws FileNotFoundException when the path does not exist;
* IOException see specific implementation
*/
public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
...
从源码可以看出,listFiles 返回一个可迭代的对象RemoteIterator<LocatedFileStatus>
,而listStatus返回的是个数组。同时,listFiles返回的都是文件。
3.创建文件夹
def mkdirToHdfs(sc: SparkContext, path: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val result = fs.mkdirs(new Path(path))
if (result) {
logger.error("mkdirs already success!")
} else {
logger.error("mkdirs had failed!")
}
}
4.删除文件夹
def deleteOnHdfs(sc: SparkContext, path: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val result = fs.delete(new Path(path), true)
if (result) {
logger.error("delete already success!")
} else {
logger.error("delete had failed!")
}
}
5.上传文件
def uploadToHdfs(sc: SparkContext, localPath: String, hdfsPath: String): Unit = {
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.copyFromLocalFile(new Path(localPath), new Path(hdfsPath))
fs.close()
}
6.下载文件
def downloadFromHdfs(sc: SparkContext, localPath: String, hdfsPath: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.copyToLocalFile(new Path(hdfsPath), new Path(localPath))
fs.close()
}
到此这篇关于Java API操作Hdfs详细示例的文章就介绍到这了,更多相关Java API操作Hdfs内容请搜索编程学习网以前的文章希望大家以后多多支持编程学习网!
本文标题为:Java API操作Hdfs的示例详解
基础教程推荐
- springboot自定义starter方法及注解实例 2023-03-31
- JDK数组阻塞队列源码深入分析总结 2023-04-18
- java基础知识之FileInputStream流的使用 2023-08-11
- ConditionalOnProperty配置swagger不生效问题及解决 2023-01-02
- Java数据结构之对象比较详解 2023-03-07
- Java实现线程插队的示例代码 2022-09-03
- java实现多人聊天系统 2023-05-19
- Java实现查找文件和替换文件内容 2023-04-06
- Java文件管理操作的知识点整理 2023-05-19
- Java并发编程进阶之线程控制篇 2023-03-07