ABP vNext + HBase:打造超高吞吐分布式列式数据库
ABP vNext + HBase:打造超高吞吐分布式列式数据库 🚀🔥
- 基于 ABP vNext 构建亿级写入、毫秒级查询的列式分布式存储
- 支持 HTTP Basic/Bearer/Kerberos+SPNEGO 多种认证方式 🔐
- 丰富的表达式 DSL、
IAsyncEnumerable
扫描 & Scanner 生命周期管理 🔄- 行键防热点、列簇压缩、TTL、版本控制优化 🔑
- 一键 Docker Compose 部署 🐳 + Kubernetes Secrets & 健康探针示例
- OpenTelemetry 链路追踪 🔍、Prometheus 指标命名规范 & Grafana 仪表盘 📊
- 单元测试(MockHttp)✅、集成测试(Testcontainers)🧪 & Phoenix SQL 层(可选)
- 支持批量写入、限流背压、多租户隔离、TLS 证书 🌐
📑 目录
- ABP vNext + HBase:打造超高吞吐分布式列式数据库 🚀🔥
- 系统架构与场景定位 🧩
- 安全认证与连接注入 🔐
- 配置模型与验证
- Kerberos+SPNEGO 示例 🛡️
- TLS 证书配置示例 🔒
- DTO、Entity 与工具类定义 📦
- Repository 封装与 DSL 查询 ⚙️
- Scanner 生命周期管理 & IAsyncEnumerable 🔄
- 行键 & 列簇设计优化 🔑
- 多租户隔离 🏷️
- Namespace 管理 📂
- 日志写入 API 示例 📝
- 批量写入与背压控制 🚀
- 部署与运维 🐳📈
- Docker Compose
- Kubernetes & Secrets 🔒
- 监控与告警 📊🚨
- OpenTelemetry & Prometheus 指标
- Grafana 仪表盘示例
- Prometheus AlertRule 示例
- 测试与 CI 集成 🧪
- 单元测试(MockHttp)
- 集成测试 & GitHub Actions
- Phoenix SQL 层(可选)🐘
系统架构与场景定位 🧩
应用场景
- 物联网设备时序上报
- 行为日志大规模采集
- 宽表用户画像写入
安全认证与连接注入 🔐
配置模型与验证
public class HBaseSettings
{[Required, Url] public string BaseUrl { get; set; } = null!; [Required] public string Table { get; set; } = null!; public string? Username { get; set; } public string? Password { get; set; } public string? Token { get; set; }
}
// Program.cs / Startup.cs
services.AddOptions<HBaseSettings>().BindConfiguration("HBaseSettings").ValidateDataAnnotations().Validate(o => !string.IsNullOrWhiteSpace(o.Username) || !string.IsNullOrWhiteSpace(o.Token),"必须配置 Username/Password 或 Token");services.AddHttpClient<IHBaseClient, HBaseRestClient>("HBase", (sp, client) =>
{var opts = sp.GetRequiredService<IOptions<HBaseSettings>>().Value;client.BaseAddress = new Uri(opts.BaseUrl);if (!string.IsNullOrEmpty(opts.Token)){client.DefaultRequestHeaders.Authorization =new AuthenticationHeaderValue("Bearer", opts.Token);}else{var cred = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{opts.Username}:{opts.Password}"));client.DefaultRequestHeaders.Authorization =new AuthenticationHeaderValue("Basic", cred);}
})
.AddPolicyHandler(PollyPolicies.DefaultRetry);
Kerberos+SPNEGO 示例 🛡️
var handler = new HttpClientHandler { UseDefaultCredentials = true };
services.AddHttpClient<IHBaseClient, HBaseRestClient>().ConfigurePrimaryHttpMessageHandler(() => handler).AddPolicyHandler(PollyPolicies.DefaultRetry);
TLS 证书配置示例 🔒
var handler = new HttpClientHandler();
handler.ClientCertificates.Add(new X509Certificate2("path/to/client.pfx", "certPassword"));
services.AddHttpClient<IHBaseClient, HBaseRestClient>().ConfigurePrimaryHttpMessageHandler(() => handler).AddPolicyHandler(PollyPolicies.DefaultRetry);
DTO、Entity 与工具类定义 📦
public class DeviceLogInput
{[Required] public string DeviceId { get; set; } = null!;[Required] public DateTime Timestamp { get; set; }public int Level { get; set; }public string JsonData { get; set; } = null!;
}public class DeviceLogEntity
{public string RowKey { get; set; } = null!;public string DeviceId { get; set; } = null!;public DateTime Timestamp { get; set; }public int Level { get; set; }public string JsonData { get; set; } = null!;
}public class HBaseRow
{public string RowKey { get; set; } = null!;public Dictionary<string, string> Columns { get; set; } = new();
}public static class RowKeyHelper
{public static string ReverseTimestamp(DateTime dt) =>(DateTime.MaxValue.Ticks - dt.Ticks).ToString("D19");public static string MakeRowKey(string deviceId, DateTime ts){int slot = Math.Abs(deviceId.GetHashCode()) % 16;return $"{slot:D2}-{ReverseTimestamp(ts)}-{deviceId}";}
}
Repository 封装与 DSL 查询 ⚙️
public interface IFilter { }public class ColumnValueFilter : IFilter
{public string Family { get; }public string Qualifier { get; }public CompareOp Op { get; }public string Value { get; }// 构造与属性省略
}public class HBaseQuery
{public string? RowKeyPrefix { get; set; }public List<IFilter> Filters { get; } = new();public DateTime? FromTime { get; set; }public DateTime? ToTime { get; set; }public int? Limit { get; set; }
}public class HBaseQueryBuilder
{private readonly HBaseQuery _q = new();public HBaseQueryBuilder WithRowPrefix(string p) { _q.RowKeyPrefix = p; return this; }public HBaseQueryBuilder AddFilter(IFilter f) { _q.Filters.Add(f); return this; }public HBaseQueryBuilder WithLimit(int l) { _q.Limit = l; return this; }public HBaseQuery Build() => _q;
}
Scanner 生命周期管理 & IAsyncEnumerable 🔄
public class HBaseRestClient : IHBaseClient, IAsyncDisposable
{// … 前面省略 …public async IAsyncEnumerable<HBaseRow> ScanAsync(HBaseQuery q){var scannerId = await CreateScannerAsync(q);try{while (true){var chunk = await ReadScannerAsync(scannerId);if (!chunk.Any()) yield break;foreach (var row in chunk) yield return row;}}finally{await DeleteScannerAsync(scannerId);}}// Create/Delete/ReadScannerAsync 等实现省略
}
行键 & 列簇设计优化 🔑
# HBase Shell:开启 Snappy、版本与 TTL
create 'device_logs',{ NAME=>'cf_base', VERSIONS=>3 },{ NAME=>'cf_ext', VERSIONS=>1, COMPRESSION=>'SNAPPY', TTL=>86400 }
列簇 | 字段 | 特性 |
---|---|---|
cf_base | id, ts, level | 高频访问字段 |
cf_ext | json_data, attributes | 稀疏字段、压缩候选 |
多租户隔离 🏷️
RowKey = $"{tenantId}-{slot:D2}-{ReverseTimestamp(ts)}-{deviceId}"
Namespace 管理 📂
create_namespace 'tenant_123'
alter 'tenant_123:device_logs', { … }
日志写入 API 示例 📝
[HttpPost("upload")]
public async Task<IActionResult> UploadLog([FromBody] DeviceLogInput input)
{if (!ModelState.IsValid)return BadRequest(ModelState);string rowKey = RowKeyHelper.MakeRowKey(input.DeviceId, input.Timestamp);var entity = new DeviceLogEntity {RowKey = rowKey,DeviceId = input.DeviceId,Timestamp = input.Timestamp,Level = input.Level,JsonData = input.JsonData};try{await _repo.InsertAsync(rowKey, entity);return Ok(new { rowKey });}catch (HBaseException ex){_logger.LogError(ex, "HBase 写入失败:{RowKey}", rowKey);return StatusCode(500, "数据写入失败");}
}
批量写入与背压控制 🚀
public async Task BulkInsertAsync(IEnumerable<DeviceLogEntity> batch)
{var mutations = batch.Select(e => new {Put = new {Row = Convert.ToBase64String(Encoding.UTF8.GetBytes(e.RowKey)),Cells = new[] {// 构建 Cells...}}});await _client.PostAsJsonAsync($"table/{_table}/mutations", new { mutations });
}var channel = Channel.CreateBounded<DeviceLogEntity>(1000);
_ = Task.Run(async () => {await foreach (var item in channel.Reader.ReadAllAsync())await BulkInsertAsync(new[]{ item });
});public Task EnqueueAsync(DeviceLogEntity log) =>channel.Writer.WriteAsync(log).AsTask();
部署与运维 🐳📈
Docker Compose
version: '3.8'services:zookeeper:image: zookeeper:3.7container_name: zkenvironment:ZOO_MY_ID: 1ZOO_SERVERS: server.1=zk:2888:3888ports:- "2181:2181"volumes:- zk-data:/datahbase:image: harisekhon/hbase:2.4.9container_name: hbasedepends_on:- zookeeperenvironment:HBASE_MANAGES_ZK: "false"HBASE_ZOOKEEPER_QUORUM: zkHBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT: 2181ports:- "16010:16010" # HBase Master UI- "9090:9090" # Stargate RESTvolumes:- hbase-data:/data/hbase- hbase-wal:/data/walknox:image: apache/knox:1.6.0container_name: knoxdepends_on:- hbaseports:- "8443:8443" # Knox Gateway HTTPSvolumes:- ./knox/conf:/etc/knox/conf:rovolumes:zk-data: # 持久化 ZooKeeper 数据(对应容器内 /data),保留选举状态与配置信息hbase-data: # 持久化 HBase RegionServer 存储数据(对应容器内 /data/hbase)hbase-wal: # 持久化 HBase 写前日志(WAL)(对应容器内 /data/wal)
Kubernetes & Secrets 🔒
apiVersion: v1
kind: Secret
metadata: { name: hbase-secret }
stringData:HBASE__BASEURL: "https://hbase-gateway.company.com"HBASE__USERNAME: "api_user"HBASE__PASSWORD: "secure!"
---
apiVersion: apps/v1
kind: Deployment
spec:replicas: 3template:spec:containers:- name: webimage: yourregistry/abp-hbase:latestenvFrom: [{ secretRef: { name: hbase-secret }}]livenessProbe:httpGet: { path: /health/live, port: 80 }readinessProbe:httpGet: { path: /health/ready, port: 80 }
监控与告警 📊🚨
OpenTelemetry & Prometheus 指标
services.AddOpenTelemetryTracing(builder =>
{builder.AddAspNetCoreInstrumentation().AddHttpClientInstrumentation().AddSource("HBaseClient").AddJaegerExporter().AddPrometheusMetrics(); // 导出 /metrics
});
- 指标示例:
hbase_request_duration_seconds
hbase_request_failures_total
hbase_scanner_active
Grafana 仪表盘示例
{"dashboard": {"title": "📊 HBase 服务指标","panels": [{"type": "graph","title": "请求时延 (95th)","targets": [{ "expr": "histogram_quantile(0.95, sum(rate(hbase_request_duration_seconds_bucket[5m])) by (le))" }]},{"type": "stat","title": "失败率","targets": [{ "expr": "increase(hbase_request_failures_total[5m]) / increase(hbase_requests_total[5m])" }]}]}
}
Prometheus AlertRule 示例
groups:
- name: hbase-alertsrules:- alert: HBaseHighErrorRateexpr: increase(hbase_request_failures_total[5m]) / increase(hbase_requests_total[5m]) > 0.05for: 2mlabels: { severity: critical }annotations: { summary: "🚨 HBase 错误率超过 5%" }
测试与 CI 集成 🧪
单元测试(MockHttp)
var mock = new MockHttpMessageHandler();
mock.When("*/table/device_logs/rowkey*").Respond("application/json", "{ /* fake row */ }");
var client = mock.ToHttpClient();
var repo = new HBaseRestClient(client, /* options */);
// Assert GetAsync、InsertAsync 行为
集成测试 & GitHub Actions
name: CI Pipeline
on: [push, pull_request]
jobs:test:runs-on: ubuntu-lateststeps:- uses: actions/checkout@v3- name: Setup .NETuses: actions/setup-dotnet@v2with: dotnet-version: '7.0.x'- name: Start HBase Containerrun: docker run -d --name hbase -p 9090:9090 harisekhon/hbase- name: Run Testsrun: dotnet test --no-build- name: Teardownrun: docker stop hbase && docker rm hbase
Phoenix SQL 层(可选)🐘
using var conn = new PhoenixConnection("Server=phoenix;...");
await conn.OpenAsync();
using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT * FROM device_logs WHERE ts > ?";
cmd.Parameters.Add(new PhoenixParameter("ts", someTimestamp));
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())Console.WriteLine(reader["json_data"]);