当前位置: 首页 > ds >正文

ABP vNext 与 HDFS 数据湖存储集成

ABP vNext 与 HDFS 数据湖存储集成 🚀


📚 目录

  • ABP vNext 与 HDFS 数据湖存储集成 🚀
    • 🧠 背景与目标
    • 🛠️ 依赖与安装
    • 🧱 系统架构设计
    • ⚙️ 核心实现
      • 1️⃣ 配置 `appsettings.json`
      • 2️⃣ 自定义 `HdfsBlobProvider`
      • 3️⃣ 注册 `HdfsBlobProvider`
      • 4️⃣ 应用服务示例
    • 🔐 HDFS HA & 安全配置
    • 💾 分片上传与合并
    • 🐶 单元测试示例
    • 📦 Docker Compose 快速部署
    • 🔍 监控与运维


🧠 背景与目标

随着企业多源数据(图像、日志、文档)激增,构建一个具备海量存储能力、统一管理视图与数据湖分析的文件平台已成必需。

基于 ABP vNext BlobStoringHDFS HA,打造一个可视化、可控、可拓展的现代数据湖文件平台。


🛠️ 依赖与安装

# ABP Blob 存储框架
dotnet add package Volo.Abp.BlobStoring
dotnet add package Volo.Abp.BlobStoring.UI# WebHDFS 客户端
dotnet add package WebHdfs.NET# 重试与断路器
dotnet add package Polly# Shell 调用
dotnet add package CliWrap# 可选:应用监控
dotnet add package Microsoft.ApplicationInsights.AspNetCore --version 2.21.0

🧱 系统架构设计

HDFS 集群
ABP 层
WebHDFS API 🌐
ZooKeeper
NameNode nn1
NameNode nn2
DataNodes
ABP UI/Swagger
IBlobContainer
BlobContainerFactory
HdfsBlobProvider (Singleton)
WebHdfsClient
  • IBlobContainer:ABP 中访问存储容器的统一接口
  • HdfsBlobProvider:继承 BlobProviderBase,支持重试、日志、监控
  • HDFS HA:通过 ZooKeeper 主备切换;支持 HTTPS/TLS 🔐 和 Kerberos 安全认证🛡️

⚙️ 核心实现

1️⃣ 配置 appsettings.json

{"Hdfs": {"NameNodeUri": "https://mycluster:50070",   // 支持 HTTP/HTTPS 🌐"User": "hdfs","UseKerberos": true,                        // 是否开启 Kerberos 🔒"KeytabPath": "/etc/security/keytabs/hdfs.headless.keytab"}
}
// HdfsOptions 定义
public class HdfsOptions
{public string NameNodeUri { get; set; } = default!;public string User { get; set; } = default!;public bool UseKerberos { get; set; }public string KeytabPath { get; set; } = default!;
}// 注册配置
context.Services.Configure<HdfsOptions>(context.Configuration.GetSection("Hdfs"));

2️⃣ 自定义 HdfsBlobProvider

using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.ApplicationInsights;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Volo.Abp.BlobStoring;
using Volo.Abp.DependencyInjection;
using WebHdfs.Core;public class HdfsBlobProvider : BlobProviderBase, ISingletonDependency
{private readonly WebHdfsClient _client;private readonly ILogger<HdfsBlobProvider> _logger;private readonly TelemetryClient _telemetry;private readonly AsyncPolicy _retryPolicy;public HdfsBlobProvider(IOptions<HdfsOptions> options,ILogger<HdfsBlobProvider> logger,TelemetryClient telemetry){var opts = options.Value;// Kerberos 初始化(容器已挂载 Keytab)if (opts.UseKerberos){Process.Start("kinit", $"-kt {opts.KeytabPath} {opts.User}");}_client = new WebHdfsClient(new Uri(opts.NameNodeUri), opts.User);_logger = logger;_telemetry = telemetry;// 3 次指数退避重试 🔄_retryPolicy = Policy.Handle<IOException>().WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(Math.Pow(2, i)));}public override async Task SaveAsync(BlobProviderSaveArgs args){var sw = Stopwatch.StartNew();try{using var buffered = new BufferedStream(args.Stream, 4 * 1024 * 1024);await _retryPolicy.ExecuteAsync(() =>_client.CreateFileAsync(args.BlobName, buffered, overwrite: true));_telemetry.TrackMetric("HDFS_Save_Duration", sw.ElapsedMilliseconds);_logger.LogInformation("✔️ 文件 {Name} 保存成功", args.BlobName);}catch (Exception ex){_logger.LogError(ex, "❌ 保存至 HDFS 失败:{Name}", args.BlobName);throw;}}public override async Task<Stream?> GetOrNullAsync(BlobProviderGetArgs args){var sw = Stopwatch.StartNew();try{var stream = await _retryPolicy.ExecuteAsync(() =>_client.OpenReadAsync(args.BlobName));_telemetry.TrackMetric("HDFS_Get_Duration", sw.ElapsedMilliseconds);_logger.LogInformation("📥 文件 {Name} 获取成功", args.BlobName);return stream;}catch (FileNotFoundException){_logger.LogWarning("⚠️ 未找到文件:{Name}", args.BlobName);return null;}}public override async Task<bool> DeleteAsync(BlobProviderDeleteArgs args){try{var result = await _retryPolicy.ExecuteAsync(() =>_client.DeleteAsync(args.BlobName));_logger.LogInformation("🗑️ 文件 {Name} 删除{Status}", args.BlobName, result);return result;}catch (Exception ex){_logger.LogError(ex, "❌ 删除 HDFS 文件失败:{Name}", args.BlobName);return false;}}
}

3️⃣ 注册 HdfsBlobProvider

Configure<AbpBlobStoringOptions>(options =>
{options.Containers.ConfigureDefault(container =>{container.ProviderType = typeof(HdfsBlobProvider);});
});

4️⃣ 应用服务示例

using Microsoft.AspNetCore.Mvc;
using Volo.Abp.BlobStoring;
using Volo.Abp.Application.Services;
using Volo.Abp.Exceptions;public class FileAppService : ApplicationService
{private readonly IBlobContainer _blobContainer;public FileAppService(IBlobContainer blobContainer){_blobContainer = blobContainer;}public async Task UploadAsync(string name, Stream content){await _blobContainer.SaveAsync(name, content);Logger.LogInformation("✅ 上传 {Name} 完成", name);}public async Task<Stream> DownloadAsync(string name){var stream = await _blobContainer.GetOrNullAsync(name)?? throw new UserFriendlyException("文件不存在");Logger.LogInformation("📤 下载 {Name} 完成", name);return stream;}
}

🔐 HDFS HA & 安全配置

<configuration><property><name>fs.defaultFS</name><value>hdfs://mycluster</value></property><property><name>dfs.nameservices</name><value>mycluster</value></property><property><name>dfs.ha.namenodes.mycluster</name><value>nn1,nn2</value></property><property><name>dfs.namenode.rpc-address.mycluster.nn1</name><value>node1:8020</value></property><property><name>dfs.namenode.rpc-address.mycluster.nn2</name><value>node2:8020</value></property><property><name>dfs.client.failover.proxy.provider.mycluster</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property><property><name>dfs.replication</name><value>3</value></property><!-- Kerberos --><property><name>hadoop.security.authentication</name><value>kerberos</value></property>
</configuration>
  • 部署要求:3×JournalNode + 3×ZooKeeper + Kerberos KDC 🛡️
  • Keytab 挂载:容器 /etc/security/keytabs,设置 chmod 400
  • HTTPS/TLS:配置 HttpClientHandler.ServerCertificateCustomValidationCallback 忽略或校验证书 🔐

💾 分片上传与合并

前端 FileController HdfsBlobProvider POST /upload-part (file, guid, index) SaveAsync("/temp/guid/part-index") for each part POST /merge-parts (guid, fileName) ListStatus("/temp/guid") CreateFileAsync("/final/fileName") OpenReadAsync(part) AppendFileAsync(final) loop [合并所有分片] 200 OK 前端 FileController HdfsBlobProvider

使用 HDFS Append 保持二进制完整,避免文本命令限制。🔗


🐶 单元测试示例

using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Moq;
using Polly;
using Volo.Abp.Testing;
using Xunit;
using WebHdfs.Core;public class HdfsBlobProvider_Tests : AbpIntegratedTestBase
{[Fact]public async Task SaveAsync_RetriesOnIOException(){var clientMock = new Mock<WebHdfsClient>(MockBehavior.Strict,new Uri("http://x"), "u");clientMock.SetupSequence(c => c.CreateFileAsync(It.IsAny<string>(), It.IsAny<Stream>(), true)).ThrowsAsync(new IOException()).ThrowsAsync(new IOException()).Returns(Task.CompletedTask);var options = Options.Create(new HdfsOptions { NameNodeUri = "http://x", User = "u" });var telemetry = new TelemetryClient();var provider = new HdfsBlobProvider(options,NullLogger<HdfsBlobProvider>.Instance,telemetry);await provider.SaveAsync(new BlobProviderSaveArgs("test", new MemoryStream()));clientMock.Verify(c => c.CreateFileAsync("test",It.IsAny<Stream>(), true), Times.Exactly(3));}
}

📦 Docker Compose 快速部署

version: '3'
services:zk:image: zookeeper:3.6ports: ["2181:2181"]journalnode:image: bde2020/hadoop-journalnode:2.0.0-hadoop3.2.1-java8depends_on: [zk]namenode:image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8depends_on: [zk, journalnode]environment:- CLUSTER_NAME=myclusterdatanode:image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8depends_on: [namenode]

🔍 监控与运维

TrackMetric
Log Information
Blob 操作
Application Insights
Log Store
Grafana/Power BI
  • Prometheus/AI 埋点:使用 TelemetryClient 或 ABP ICounter 记录操作耗时
  • 日志链路:加入 CorrelationIdBlobNameNodeAddress 等上下文信息
  • 健康检查:配置 ABP HealthChecks,监测 HDFS 端点 ✅

http://www.xdnf.cn/news/12936.html

相关文章:

  • epoll+线程池
  • 正点原子[第三期]Arm(iMX6U)Linux移植学习笔记-12.1 Linux内核启动流程简介
  • 第二章 无刷电机硬件控制
  • 31.2linux中Regmap的API驱动icm20608实验(编程)_csdn
  • Prompt Enginering(提示工程)先进技术
  • 基于FPGA的超声波显示水位距离,通过蓝牙传输水位数据到手机,同时支持RAM存储水位数据,读取数据。
  • 关于 ffmpeg设置摄像头报错“Could not set video options” 的解决方法
  • Kubernetes 节点资源驱逐策略详解:evictionHard 与 evictionSoft
  • 附加模块--Qt OpenGL模块功能及架构
  • 利用pandas gradio实现简单的项目子项拆解及排期
  • Fractal Generative Models论文阅读笔记与代码分析
  • 树莓派超全系列教程文档--(57)如何设置 Apache web 服务器
  • 抖音怎么下载没有水印的视频?
  • ArkUI-X与Android桥接通信之方法回调
  • 华为OD机试真题——机房布局(2025B卷:100分)Java/python/JavaScript/C++最佳实现
  • LeetCode - 53. 最大子数组和
  • 【每日一题 | 2025年6.2 ~ 6.8】第16届蓝桥杯部分偏简单题
  • 大数据治理的常见方式
  • Unity VR/MR开发-VR/开发SDK选型对比分析
  • 20-Oracle 23 ai free Database Sharding-特性验证
  • 求解插值多项式及其余项表达式
  • 阿里云OSS 上传文件 Python版本
  • Xxl-job——源码设计思考
  • 2025年6月6日第一轮
  • 基于算法竞赛的c++编程(20)函数的递归
  • Spring Security深度解析:构建企业级安全框架
  • 港科大快手提出统一上下文视频编辑 UNIC,各种视频编辑任务一网打尽,还可进行多项任务组合!
  • MQTT协议详解技术文档
  • 微服务架构实战:Nacos 单机版的安装与启动流程
  • 号外!PLC和安川伺服,通过Profinet转EtherCAT网关同步多个工作站的运动