Hadoop 002 — HDFS常用命令及SpringBoot整合操作
Hadoop-HDFS操作
文章目录
- Hadoop-HDFS操作
- 1.HDFS命令行操作
- 1.1 基本命令
- 1.1.1 查看目录内容
- 1.1.2 创建目录
- 1.1.3 删除文件或目录
- 1.1.4 上传文件
- 1.1.5 下载文件
- 1.1.6 查看文件内容
- 1.1.7 追加内容到文件
- 1.1.7.1手动释放租约
- 1.1.7.2修改副本数量
- 1.1.8 移动文件
- 1.1.9 复制文件
- 1.1.10 统计文件大小
- 1.1.11 统计文件数量和大小
- 1.2 管理命令
- 1.2.1 查看HDFS状态报告
- 1.2.2 安全模式操作
- 2.SpringBoot操作HDFS
- 1.xml依赖
- 2.抽取yml配置
- 3.Hdfs操作工具类
客户端默认地址:http://localhost:9870/explorer.html#/
1.HDFS命令行操作
1.1 基本命令
1.1.1 查看目录内容
列出指定路径下的文件和目录信息。
hdfs dfs -ls /test
1.1.2 创建目录
``创建目录,-p
参数用于创建多级目录。
hdfs dfs -mkdir -p /test/user
1.1.3 删除文件或目录
删除文件或目录,-r
参数用于递归删除目录(可选)。
hdfs dfs -rm -r /test
1.1.4 上传文件
将本地文件上传到HDFS。
hdfs dfs -put C:\Users\29699\Desktop\DeepSeek从入门到精通-清华.pdf /test/aaa.pdf
1.1.5 下载文件
- 命令:
- 说明: 将HDFS文件下载到本地。
hdfs dfs -get /test/aaa.pdf C:\Users\29699\Desktop\测试.pdf
linux
hdfs dfs -get /user/hadoop/data/file.txt /local/file.txt
1.1.6 查看文件内容
显示HDFS文件内容
hdfs dfs -cat /test/demo.txt
1.1.7 追加内容到文件
- 命令:
hdfs dfs -appendToFile <localsrc> <dst>
- 说明: 将本地文件内容追加到HDFS文件末尾。
hdfs dfs -appendToFile C:\Users\29699\Desktop\append.txt /test/demo.txt
如果报错
appendToFile: Failed to APPEND_FILE /test/demo.txt for DFSClient_NONMAPREDUCE_1383585836_1 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1120973147_1 on 127.0.0.1
问题原因
- 租约占用
HDFS 文件在写入时会获取一个租约(lease),确保同一时间只有一个客户端可以写入。若文件当前被其他客户端(如DFSClient_NONMAPREDUCE_-1120973147_1
)持有租约,新写入操作会失败 。 - 租约未释放
- 旧客户端可能未正确关闭文件流,导致租约未释放。
- 或者新客户端尝试追加时,旧租约仍在有效期内(租约硬限制为1小时)。
- 数据节点异常
若数据节点(Datanode)不可用,HDFS 会尝试替换失败节点,可能导致租约冲突 。
1.1.7.1手动释放租约
hdfs debug recoverLease -path /test/demo.txt -retries 10
错误2:
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[127.0.0.1:
9866,DS-8452ef95-afeb-4799-993b-564b8c1e18ca,DISK]], original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-8452ef95-afeb-4799-993b-564b8c1e18ca,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
错误提示中提到“no more good datanodes being available to try”,表明当前集群中可用的DataNode数量不足以满足副本数(replication factor)要求。例如,若副本数设置为3,但集群中仅有2个DataNode,写入时会因无法找到可用节点而失败。
因为本地只有一个副本,但是上传时候副本设置的三,导致报错
,下面新创建的就正常
1.1.7.2修改副本数量
-R
:递归修改目录及其子目录的所有文件。
hdfs dfs -setrep -R 1 /test
修改默认副本数
-
修改配置文件:
-
打开
hdfs-site.xml
,修改dfs.replication
参数值:<property><name>dfs.replication</name><value>2</value> <!-- 新副本数 --> </property>
-
作用:新写入的数据将使用此副本数,但已存在的文件副本数不变。
-
-
重启服务:
-
重启NameNode和Datanode以生效配置:
service hadoop-hdfs-namenode restart service hadoop-hdfs-datanode restart
-
1.1.8 移动文件
- 命令:
hdfs dfs -mv <src> <dst>
- 说明: 移动HDFS中的文件或目录。
hdfs dfs -mv /test/demo.txt /test/user/demo.txt
1.1.9 复制文件
-
命令:
hdfs dfs -cp <src> <dst>
-
说明: 复制HDFS中的文件或目录。
hdfs dfs -cp /test/user/demo.txt /test/user/newDemo.txt
1.1.10 统计文件大小
- 命令:
hdfs dfs -du [-s] <path>
- 说明: 显示指定路径下每个文件的大小,
-s
参数用于统计总大小。
hdfs dfs -du -s /test
1.1.11 统计文件数量和大小
- 命令:
hdfs dfs -count <path>
- 说明: 统计指定路径下的目录个数、文件个数和文件总计大小。
hdfs dfs -count /test
1.2 管理命令
1.2.1 查看HDFS状态报告
- 说明: 显示HDFS的总容量、剩余容量、DataNode的相关信息。
hdfs dfsadmin -report
1.2.2 安全模式操作
- 命令:
hdfs dfsadmin -safemode <enter | leave | get | wait>
- 说明: 进入、离开、获取或等待安全模式状态。
在安全模式下,NameNode 会限制对文件系统的操作,仅允许读取,不允许写入、删除或修改文件。
hdfs dfsadmin -safemode enter
2.SpringBoot操作HDFS
1.xml依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath /></parent><groupId>org.example</groupId><artifactId>springboot-hadoop</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.16.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.1.3</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
2.抽取yml配置
@Data
@Component
@ConfigurationProperties(prefix = "hdfs")
public class HDFSProperties {/*** host*/private String host;/*** 上传基础路径*/private String uploadPath;/*** 操作的用户名*/private String username;}
hdfs:host: hdfs://hadoop001:9000upload-path: /user/hadoopusername: moshangshang
3.Hdfs操作工具类
@Service
public class HDFSService {@Autowiredprivate HDFSProperties hdfsProperties;/*** 获取HDFS配置信息*/private Configuration getHDFSConfiguration() {Configuration configuration = new Configuration();configuration.set("dfs.client.use.datanode.hostname", "true");configuration.set("fs.defaultFS", hdfsProperties.getHost());return configuration;}/*** 获取FileSystem对象* 客户端去操作hdfs时,是有一个用户身份的,* 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=hadoop* 也可以在构造客户端fs对象时,通过参数传递进去*/private FileSystem getFileSystem() throws Exception {return FileSystem.get(new URI(hdfsProperties.getHost()), getHDFSConfiguration(), hdfsProperties.getUsername());}/*** 递归创建目录*/public boolean mkdir(String path) throws Exception {if (StringUtils.isEmpty(path)) {return false;}if (existFile(path)) {return true;}FileSystem fileSystem = getFileSystem();Path srcPath = new Path(hdfsProperties.getUploadPath() + path);boolean isOk = fileSystem.mkdirs(srcPath);fileSystem.close();return isOk;}/*** 判断HDFS文件是否存在*/public boolean existFile(String path) throws Exception {if (StringUtils.isEmpty(path)) {return false;}FileSystem fileSystem = getFileSystem();Path srcPath = new Path(path);return fileSystem.exists(srcPath);}/*** 删除HDFS文件*/public boolean deleteFile(String path) throws Exception {if (StringUtils.isEmpty(path)) {return false;}if (!existFile(path)) {return false;}FileSystem fs = getFileSystem();Path srcPath = new Path(path);boolean isOk = fs.deleteOnExit(srcPath);fs.close();return isOk;}/*** 上传HDFS文件*/public void uploadFile(String path, String uploadPath) throws Exception {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {return;}FileSystem fs = getFileSystem();// 上传路径Path clientPath = new Path(path);// 目标路径Path serverPath = new Path(uploadPath);// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为falsefs.copyFromLocalFile(false, clientPath, serverPath);fs.close();}/*** 上传文件*/public void uploadFile(MultipartFile multipartFile, String path) throws Exception {FileSystem fs = getFileSystem();InputStream in = multipartFile.getInputStream();// 输出流OutputStream out = fs.create(new Path(hdfsProperties.getUploadPath() + path));// 连接两个流,形成通道,使输入流向输出流传输数据IOUtils.copyBytes(in, out, 1024, true);out.close();in.close();fs.close();}/*** 读取HDFS文件内容*/public void readFile(String filePath) throws Exception {FileSystem fs = getFileSystem();Path path = new Path(hdfsProperties.getUploadPath() + filePath);InputStream in = null;try {in = fs.open(path);//复制到标准输出流IOUtils.copyBytes(in, System.out, 4096, false);System.out.println("\n读取文件成功!");} catch (Exception e) {System.out.println("\n读取文件失败!");} finally {IOUtils.closeStream(in);}}/*** 下载HDFS文件*/public void downloadFile(String path, String downloadPath) throws Exception {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {return;}FileSystem fs = getFileSystem();// 上传路径Path clientPath = new Path(path);// 目标路径Path serverPath = new Path(downloadPath);// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为falsefs.copyToLocalFile(false, clientPath, serverPath);fs.close();}/*** 追加内容到文件*/public void appendFile(String path, String appendPath) throws Exception {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(appendPath)) {return;}FileSystem fs = getFileSystem();Path filePath = new Path(hdfsProperties.getUploadPath() + path);FileInputStream fis = new FileInputStream(appendPath);fs.append(filePath).writeBytes(fis.toString());IOUtils.closeStream(fis);fs.close();}/*** 在HDFS创建文件,并向文件填充内容*/public void createFile(String filePath, byte[] files) {try {FileSystem fs = getFileSystem();//目标路径Path path = new Path(hdfsProperties.getUploadPath() + filePath);//打开一个输出流FSDataOutputStream outputStream = fs.create(path);outputStream.write(files);outputStream.close();fs.close();System.out.println("创建文件成功!");} catch (Exception e) {System.out.println("创建文件失败!");}}/*** 下载文件*/public void downloadFile(String downPath, String fileName, HttpServletResponse response) throws Exception {FSDataInputStream fileinput = null;OutputStream os = null;FileSystem fs = getFileSystem();try {response.setContentType("multipart/form-data");//设置编码格式response.setCharacterEncoding("UTF-8");//设置可以识别Html文件response.setContentType("text/html");response.setHeader("Content-Disposition", "attachment;filename=" + fileName);fileinput = fs.open(new Path(hdfsProperties.getUploadPath() + downPath));os = response.getOutputStream();int b;byte[] buffer = new byte[1024];while ((b = fileinput.read(buffer)) != -1) {// 4.写到输出流(out)中os.write(buffer, 0, b);}os.flush();} catch (Exception e) {e.printStackTrace();} finally {IOUtils.closeStream(fileinput);IOUtils.closeStream(os);IOUtils.closeStream(fs);}}/*** 读取HDFS文件列表*/public List<Map<String, Object>> listFile(String filePath) throws Exception {filePath = hdfsProperties.getUploadPath() + filePath;FileSystem fs = getFileSystem();List<Map<String, Object>> list = new ArrayList<>();//递归找到所有的文件RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(filePath), true);while (listFiles.hasNext()) {Map<String, Object> map = new HashMap<>();LocatedFileStatus next = listFiles.next();String name = next.getPath().getName();Path path = next.getPath();map.put("fileName", name);map.put("filePath", path.toUri());list.add(map);}return list;}/*** 文件重命名*/public boolean renameFile(String oldName, String newName) throws Exception {FileSystem fs = getFileSystem();Path oldPath = new Path(hdfsProperties.getUploadPath() + oldName);Path newPath = new Path(hdfsProperties.getUploadPath() + newName);boolean isOk = fs.rename(oldPath, newPath);fs.close();return isOk;}/*** 读取HDFS文件内容*/public InputStream readFileInput(String filePath) throws Exception {FileSystem fs = getFileSystem();Path path = new Path(hdfsProperties.getUploadPath() + filePath);return fs.open(path);}/*** 获取某个文件在HDFS的集群位置*/public BlockLocation[] getFileBlockLocations(String path) throws Exception {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}FileSystem fs = getFileSystem();// 目标路径Path srcPath = new Path(hdfsProperties.getUploadPath() + path);FileStatus fileStatus = fs.getFileStatus(srcPath);return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());}/*** 读取HDFS目录详细信息*/public List<Map<String, Object>> pathInfo(String filePath) throws Exception {FileSystem fs = getFileSystem();FileStatus[] listStatus = fs.listStatus(new Path(hdfsProperties.getUploadPath() + filePath));List<Map<String, Object>> list = new ArrayList<>();SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");for (FileStatus fileStatus : listStatus) {Map<String, Object> map = new HashMap<>();Date date = new Date(fileStatus.getModificationTime());map.put("name", fileStatus.getPath().toUri().getPath().replace(filePath, ""));map.put("directory", fileStatus.isDirectory());map.put("time", sd.format(date));list.add(map);}list.sort((o1, o2) -> {Boolean directory1 = Boolean.parseBoolean(o1.get("directory").toString());Boolean directory2 = Boolean.parseBoolean(o2.get("directory").toString());return directory2.compareTo(directory1);});return list;}
}