当前位置: 首页 > web >正文

SpringBoot与BookKeeper整合,实现金融级别的日志存储系统

BookKeeper的优势

高吞吐量和低延迟

  • 分布式架构: Apache BookKeeper采用分布式的架构设计,能够支持高并发的写入和读取操作。

  • 批量写入: 支持批量写入日志条目,显著提高写入效率。

  • 异步I/O: 使用异步I/O操作,减少等待时间,提升整体性能。

数据一致性和持久性

  • 强一致性保证: BookKeeper提供强一致性保证,确保所有写入的数据都能被正确读取。

  • 多副本复制: 数据在多个Bookies(BookKeeper节点)上进行多副本复制,防止单点故障导致的数据丢失。

  • 自动恢复: 在节点故障时,BookKeeper能够自动检测并恢复数据,确保系统的连续运行。

水平扩展能力

  • 动态扩展: 可以通过增加Bookies来扩展集群规模,适应不断增长的业务需求。

  • 负载均衡: 自动分配负载,确保各节点之间的工作负载平衡,避免热点问题。

  • 灵活性: 支持多种部署方式,包括本地部署、云部署等。

数据加密和访问控制

  • 数据加密: 支持对存储的日志数据进行加密处理,防止未授权访问。

  • 认证和授权: 提供细粒度的权限管理机制,限制不同角色的访问权限。

  • 审计日志: 记录所有对系统的访问和操作,便于追踪和审计。

记得启动ZooKeeper服务器

因为BookKeeper依赖于ZooKeeper来进行元数据管理和协调!!!

代码实操

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.5</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>bookkeeper-springboot-example</artifactId><version>0.0.1-SNAPSHOT</version><name>bookkeeper-springboot-example</name><description>Demo project for Spring Boot and Apache BookKeeper integration</description><properties><java.version>11</java.version></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Apache BookKeeper Client --><dependency><groupId>org.apache.bookkeeper</groupId><artifactId>bookkeeper-server</artifactId><version>4.18.0</version></dependency><!-- Jackson Databind for JSON processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok for reducing boilerplate code --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- Test dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

application.properties

# ZooKeeper 连接字符串
bookkeeper.zk.connectString=localhost:2181
server.port=8080

配置类

package com.example.bookkeeperspringbootexample.config;import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.PreDestroy;
import java.io.IOException;@Configuration
publicclass BookKeeperConfig {privatestaticfinal Logger logger = LoggerFactory.getLogger(BookKeeperConfig.class);@Value("${bookkeeper.zk.connectString}")private String zkConnectString;private BookKeeper bookKeeper;private LedgerHandle ledgerHandle;/*** 初始化BookKeeper客户端** @return BookKeeper实例* @throws IOException 如果初始化失败*/@Beanpublic BookKeeper bookKeeper() throws IOException {ClientConfiguration conf = new ClientConfiguration();conf.setZkServers(zkConnectString);bookKeeper = new BookKeeper(conf);logger.info("BookKeeper客户端已初始化。");return bookKeeper;}/*** 创建一个新的Ledger** @param bookKeeper BookKeeper实例* @return LedgerHandle实例* @throws Exception 如果创建Ledger失败*/@Beanpublic LedgerHandle ledgerHandle(BookKeeper bookKeeper) throws Exception {ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.CRC32,"password".getBytes());logger.info("Ledger已创建,ID: {}", ledgerHandle.getId());return ledgerHandle;}/*** 关闭BookKeeper客户端和Ledger*/@PreDestroypublic void shutdown() throws InterruptedException, BookKeeper.BKException {if (ledgerHandle != null) {ledgerHandle.close();logger.info("Ledger已关闭。");}if (bookKeeper != null) {bookKeeper.close();logger.info("BookKeeper客户端已关闭。");}}}

交易的数据模型

package com.example.bookkeeperspringbootexample.model;import lombok.Data;import java.time.LocalDateTime;/*** 表示交易的数据模型*/
@Data
public class Transaction {private Long transactionId; // 交易IDprivate Double amount;      // 交易金额private LocalDateTime timestamp; // 时间戳
}

服务类

package com.example.bookkeeperspringbootexample.service;import com.example.bookkeeperspringbootexample.model.Transaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Service
publicclass BookKeeperService {privatestaticfinal Logger logger = LoggerFactory.getLogger(BookKeeperService.class);@Autowiredprivate LedgerHandle ledgerHandle;@Autowiredprivate ObjectMapper objectMapper;/*** 异步添加交易到BookKeeper** @param transaction 交易对象* @return CompletableFuture<Long> 包含新条目的entryId*/public CompletableFuture<Long> addTransaction(Transaction transaction) {try {byte[] logData = objectMapper.writeValueAsBytes(transaction); // 将交易对象转换为字节数组return CompletableFuture.supplyAsync(() -> {try {long entryId = ledgerHandle.addEntry(logData); // 将字节数组添加到Ledgerlogger.info("已添加交易,entryId: {}", entryId);return entryId;} catch (BKException | InterruptedException e) {thrownew RuntimeException(e);}});} catch (IOException e) {thrownew RuntimeException(e);}}/*** 异步从BookKeeper读取交易** @param entryId 条目ID* @return CompletableFuture<Transaction> 包含读取的交易对象*/public CompletableFuture<Transaction> readTransaction(long entryId) {return CompletableFuture.supplyAsync(() -> {try {LedgerSequence seq = ledgerHandle.readEntries(entryId, entryId); // 读取指定entryId的条目if (seq.hasMoreElements()) {LedgerEntry entry = seq.nextElement(); // 获取条目byte[] data = entry.getEntryBytes(); // 获取条目的字节数组logger.info("已读取交易,entryId: {}", entryId);return objectMapper.readValue(data, Transaction.class); // 将字节数组转换为交易对象}thrownew IllegalArgumentException("未找到ID为 " + entryId + " 的交易");} catch (BKException | InterruptedException | ExecutionException | IOException e) {thrownew RuntimeException(e);}});}}

Controller

package com.example.bookkeeperspringbootexample.controller;import com.example.bookkeeperspringbootexample.model.Transaction;
import com.example.bookkeeperspringbootexample.service.BookKeeperService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import java.util.concurrent.CompletableFuture;@RestController
@RequestMapping("/transactions")
publicclass TransactionController {@Autowiredprivate BookKeeperService bookKeeperService;/*** 添加新的交易** @param transaction 交易对象* @return ResponseEntity<Long> 包含新条目的entryId*/@PostMapping("/")public ResponseEntity<Long> addTransaction(@RequestBody Transaction transaction) {CompletableFuture<Long> futureEntryId = bookKeeperService.addTransaction(transaction); // 异步添加交易try {Long entryId = futureEntryId.get(); // 获取结果return ResponseEntity.ok(entryId); // 返回成功的HTTP响应} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt(); // 中断线程return ResponseEntity.internalServerError().build(); // 返回内部服务器错误}}/*** 根据entryId读取交易** @param entryId 条目ID* @return ResponseEntity<Transaction> 包含读取的交易对象*/@GetMapping("/{entryId}")public ResponseEntity<Transaction> getTransaction(@PathVariable long entryId) {CompletableFuture<Transaction> futureTransaction = bookKeeperService.readTransaction(entryId); // 异步读取交易try {Transaction transaction = futureTransaction.get(); // 获取结果return ResponseEntity.ok(transaction); // 返回成功的HTTP响应} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt(); // 中断线程return ResponseEntity.notFound().build(); // 返回未找到资源}}}

Application

package com.example.bookkeeperspringbootexample;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class BookKeeperSpringBootExampleApplication {public static void main(String[] args) {SpringApplication.run(BookKeeperSpringBootExampleApplication.class, args);}}

测试

添加交易

curl -X POST http://localhost:8080/transactions/ \
-H "Content-Type: application/json" \
-d '{"transactionId": 1, "amount": 100.50, "timestamp": "2025-03-19T21:36:06"}'

Respons:

1

读取交易

curl -X GET http://localhost:8080/transactions/1

Respons:

{"transactionId":1,"amount":100.5,"timestamp":"2025-03-19T21:36:06"}
http://www.xdnf.cn/news/2435.html

相关文章:

  • 小结:BFD
  • 解决SSLError: [SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC] decryption faile的问题
  • React19 useOptimistic 用法
  • 文字光影扫过动效
  • 1999-2022年各省研究与试验发展经费内部支出数据/研发经费内部支出数据/RD经费内部支出数据
  • 鸿蒙NEXT开发正则工具类(ArkTs)
  • Qt/C++开发监控GB28181系统/设备注册/设备注销/密码认证/心跳保活/校时
  • [MCU]SRAM
  • JVM指令手册:深入理解字节码执行机制
  • 图像生成新势力:GPT-Image-1 与 GPT-4o 在智创聚合 API 的较量
  • 大数据学习栈记——Hive4.0.1安装
  • 整合 | 大模型时代:微调技术在医疗智能问答矩阵的实战应用20250427
  • 正则表达式详解
  • π0.5:带开放世界泛化的视觉-语言-动作模型
  • C++学习:六个月从基础到就业——模板编程:模板特化
  • web字符转义
  • Maven概述
  • Leetcode837.新21点
  • GRS认证审核内容?GRS认证基本概述?GRS认证的好处?
  • 【应用密码学】实验二 分组密码(2)
  • 「浏览器即OS」:WebVM技术栈如何用Wasm字节码重构冯·诺依曼体系?
  • 革新桌面自动化:微软UFO²操作系统深度解析与未来展望
  • C++笔记-模板进阶和继承(上)
  • 最佳实践-HENGSHI SENSE 可视化创作中如何引入数据集市的成果
  • 企业数据赋能 | 应用模板分享:汽车销售仪表板
  • Ubuntu下MySQL的安装
  • 前端高频面试题day2
  • 【MySQL】表的CRUD
  • 第1讲、#PyTorch教学环境搭建与Tensor基础操作详解
  • 计算机网络学习笔记 4-6章