区块链 和 一致性哈希的结合
怎么结合呢?
我们先来回顾一下一致性哈希代码实现里面的结构
// Consistent holds the information about the members of the consistent hash circle.
type Consistent struct {mu sync.RWMutex // 读写锁,用于保护并发访问共享数据config Config // 存储配置信息hasher Hasher // 存储哈希器实例,直接从 config 复制过来sortedSet []uint64 // 存储所有虚拟节点(通过成员名称+副本索引哈希得到)的哈希值,并保持升序排列。这是哈希环的骨架。partitionCount uint64 // 逻辑分区的总数,从 config.PartitionCount 转换而来loads map[string]float64 // 存储每个真实成员当前的负载(即它拥有的逻辑分区数量)members map[string]*Member // 存储所有真实成员的映射,键是成员的 String() 返回值,值是指向 Member 接口的指针partitions map[int]*Member // 核心映射:存储每个逻辑分区ID(0到PartitionCount-1)对应的真实成员(指向 Member 接口的指针)ring map[uint64]*Member // 存储虚拟节点哈希值到真实成员的映射。它是 sortedSet 的补充,sortedSet 提供有序列表,ring 提供哈希值到成员的查找。
}
现在就是加入一个新的服务器后,会修改什么内容
func (c *Consistent) add(member Member) {for i := 0; i < c.config.ReplicationFactor; i++ {key := []byte(fmt.Sprintf("%s%d", member.String(), i))h := c.hasher.Sum64(key)c.ring[h] = &memberc.sortedSet = append(c.sortedSet, h)}// sort hashes ascendinglysort.Slice(c.sortedSet, func(i int, j int) bool {return c.sortedSet[i] < c.sortedSet[j]})// Storing member at this map is useful to find backup members of a partition.c.members[member.String()] = &member
}
我们每次增加服务器都会修改sortedset以及ring(虚拟节点和服务器的映射),但是如果我们能够得到服务器列表,那么是可以在本地构建sortedset和ring
分区和服务器映射需要存储到区块链中吗?不需要
它是通过确定性算法计算出来的:partitions 的内容是根据成员列表、虚拟节点哈希环和有界负载等参数,通过 distributePartitions() 方法计算得出的。这个过程是确定性的,只要输入(成员列表、配置)相同,输出(partitions 映射)也一定相同。
它与 sortedSet 是强关联的:partitions 映射的构建依赖于 sortedSet 和 ring。distributePartitions() 函数正是通过遍历 sortedSet 来为每个逻辑分区寻找归属成员的。
它是高频查询的关键:Consistent.LocateKey 方法的最后一步就是查询 partitions 映射。如果这个映射也需要从区块链上获取,那么每次键的定位都会变成一个慢速的区块链查询,这会彻底破坏系统的性能。
下面开始构建
链码
我们先创建一个链码
找一个位置存下下面的代码
package mainimport ("encoding/json""fmt""log"// "strconv"// "github.com/hyperledger/fabric-chaincode-go/shim""github.com/hyperledger/fabric-contract-api-go/contractapi"
)// ConsistentHashManager 链码的智能合约结构体
type ConsistentHashManager struct {contractapi.Contract
}// 成员列表键,用于在世界状态中存储成员列表
const membersKey = "all_members_key"// Member represents a member in the consistent hash ring.
// This is the same structure we used in our previous local Go program.
type Member struct {Name string `json:"name"`
}// InitLedger 初始化链码。
// 在这里,我们将创建一个空的成员列表并将其写入世界状态。
func (s *ConsistentHashManager) InitLedger(ctx contractapi.TransactionContextInterface) error {log.Printf("Initializing the ledger with an empty member list.")// 创建一个空成员列表members := []Member{}membersJSON, err := json.Marshal(members)if err != nil {return err}// 将空成员列表写入世界状态return ctx.GetStub().PutState(membersKey, membersJSON)
}// AddMember 添加一个新成员到哈希环中。
func (s *ConsistentHashManager) AddMember(ctx contractapi.TransactionContextInterface, memberName string) error {log.Printf("Attempting to add member: %s", memberName)// 从世界状态中读取当前成员列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return fmt.Errorf("failed to read from world state: %w", err)}// 如果列表不存在,创建一个空列表if membersJSON == nil {membersJSON = []byte("[]")}// 反序列化成员列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return fmt.Errorf("failed to unmarshal member list: %w", err)}// 检查成员是否已存在for _, member := range members {if member.Name == memberName {return fmt.Errorf("member '%s' already exists", memberName)}}// 添加新成员members = append(members, Member{Name: memberName})// 将更新后的列表序列化membersJSON, err = json.Marshal(members)if err != nil {return err}// 将更新后的列表写入世界状态return ctx.GetStub().PutState(membersKey, membersJSON)
}// RemoveMember 从哈希环中移除一个成员。
func (s *ConsistentHashManager) RemoveMember(ctx contractapi.TransactionContextInterface, memberName string) error {log.Printf("Attempting to remove member: %s", memberName)// 从世界状态中读取当前成员列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return fmt.Errorf("failed to read from world state: %w", err)}if membersJSON == nil {return fmt.Errorf("member list is empty, cannot remove '%s'", memberName)}// 反序列化成员列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return fmt.Errorf("failed to unmarshal member list: %w", err)}// 查找并移除成员found := falsefor i, member := range members {if member.Name == memberName {// 移除切片中的元素members = append(members[:i], members[i+1:]...)found = truebreak}}if !found {return fmt.Errorf("member '%s' not found", memberName)}// 将更新后的列表序列化membersJSON, err = json.Marshal(members)if err != nil {return err}// 将更新后的列表写入世界状态return ctx.GetStub().PutState(membersKey, membersJSON)
}// GetMembers 查询并返回当前所有成员。
func (s *ConsistentHashManager) GetMembers(ctx contractapi.TransactionContextInterface) ([]*Member, error) {log.Println("Querying for all members.")// 从世界状态中读取成员列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return nil, fmt.Errorf("failed to read from world state: %w", err)}if membersJSON == nil {log.Println("No members found, returning empty list.")return []*Member{}, nil}// 反序列化成员列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return nil, fmt.Errorf("failed to unmarshal member list: %w", err)}// 将 []Member 转换为 []*Member 以符合 contractapi 接口var result []*Memberfor i := range members {result = append(result, &members[i])}return result, nil
}func main() {chaincode, err := contractapi.NewChaincode(&ConsistentHashManager{})if err != nil {log.Panicf("Error creating consistent hash manager chaincode: %v", err)}if err := chaincode.Start(); err != nil {log.Panicf("Error starting consistent hash manager chaincode: %v", err)}
}
然后我们再运行
go mod init consistent-hash-manager # 你可以替换成你喜欢的模块名
go mod tidy
下面就可以去部署网络,部署链码
./network.sh deployCC -ccn consistent-hash-manager -ccp ../chaincode/ConsistentHashManager -ccv 1.0 -ccl go -c mychannel
接着我们运行我们的客户端代码
package mainimport ("bytes""crypto/x509""encoding/json""fmt""log""os""path"// "time""github.com/hyperledger/fabric-gateway/pkg/client""github.com/hyperledger/fabric-gateway/pkg/identity""google.golang.org/grpc""google.golang.org/grpc/credentials"
)// 定义链码名称、通道名称和网络配置路径
const (mspID = "Org1MSP"cryptoPath = "../../test-network/organizations/peerOrganizations/org1.example.com"certPath = cryptoPath + "/users/User1@org1.example.com/msp/signcerts"keyPath = cryptoPath + "/users/User1@org1.example.com/msp/keystore"tlsCertPath = cryptoPath + "/peers/peer0.org1.example.com/tls/ca.crt"peerEndpoint = "localhost:7051" // Fabric Peer 的 Gateway 服务地址gatewayPeer = "peer0.org1.example.com" // Gateway Peer 的主机名,用于 TLS 验证channelName = "mychannel"chaincodeName = "consistent-hash-manager" // 使用我们自己的链码名称
)// Member 结构体,用于解析链码返回的成员列表
type Member struct {Name string `json:"name"`
}// newGrpcConnection 创建与 Gateway 服务器的 gRPC 连接
func newGrpcConnection() *grpc.ClientConn {log.Println("--> Creating gRPC client connection...")certificatePEM, err := os.ReadFile(tlsCertPath)if err != nil {log.Fatalf("Failed to read TLS certificate file at %s: %v", tlsCertPath, err)}certPool := x509.NewCertPool()if !certPool.AppendCertsFromPEM(certificatePEM) {log.Fatalf("Failed to append TLS CA certificate to pool")}transportCredentials := credentials.NewClientTLSFromCert(certPool, gatewayPeer)connection, err := grpc.NewClient(peerEndpoint, grpc.WithTransportCredentials(transportCredentials), grpc.WithBlock())if err != nil {log.Fatalf("Failed to create gRPC connection: %v", err)}log.Println("--> gRPC client connection created successfully.")return connection
}// newIdentity 为 Gateway 连接创建一个客户端身份 (X.509 证书)
func newIdentity() *identity.X509Identity {log.Println("--> Creating new client identity...")certificatePEM, err := readFirstFile(certPath)if err != nil {log.Fatalf("Failed to read certificate file from %s: %v", certPath, err)}certificate, err := identity.CertificateFromPEM(certificatePEM)if err != nil {log.Fatalf("Failed to parse identity certificate: %v", err)}id, err := identity.NewX509Identity(mspID, certificate)if err != nil {log.Fatalf("Failed to create X.509 identity: %v", err)}log.Println("--> Client identity created successfully.")return id
}// newSign 创建一个函数,该函数使用私钥从消息摘要生成数字签名。
func newSign() identity.Sign {log.Println("--> Creating new private key signer...")privateKeyPEM, err := readFirstFile(keyPath)if err != nil {log.Fatalf("Failed to read private key file from %s: %v", keyPath, err)}privateKey, err := identity.PrivateKeyFromPEM(privateKeyPEM)if err != nil {log.Fatalf("Failed to parse private key: %v", err)}sign, err := identity.NewPrivateKeySign(privateKey)if err != nil {log.Fatalf("Failed to create private key signer: %v", err)}log.Println("--> Private key signer created successfully.")return sign
}// readFirstFile 从指定目录中读取第一个文件
func readFirstFile(dirPath string) ([]byte, error) {dir, err := os.Open(dirPath)if err != nil {return nil, fmt.Errorf("failed to open directory %s: %w", dirPath, err)}defer dir.Close()fileNames, err := dir.Readdirnames(1)if err != nil {return nil, fmt.Errorf("failed to read file names from directory %s: %w", dirPath, err)}if len(fileNames) == 0 {return nil, fmt.Errorf("no files found in directory: %s", dirPath)}filePath := path.Join(dirPath, fileNames[0])fileContent, err := os.ReadFile(filePath)if err != nil {return nil, fmt.Errorf("failed to read file %s: %w", filePath, err)}return fileContent, nil
}// formatJSON 格式化 JSON 数据,使其更易读
func formatJSON(data []byte) string {if len(data) == 0 {return "[]" // 如果数据为空,返回一个空的 JSON 数组字符串}var prettyJSON bytes.Bufferif err := json.Indent(&prettyJSON, data, "", " "); err != nil {log.Printf("Warning: Failed to parse JSON, returning raw data. Error: %v", err)return string(data)}return prettyJSON.String()
}func main() {// 1. 创建 gRPC 客户端连接clientConnection := newGrpcConnection()defer clientConnection.Close()// 2. 创建客户端身份和签名函数id := newIdentity()sign := newSign()// 3. 连接 Fabric Gatewaygw, err := client.Connect(id,client.WithSign(sign),client.WithClientConnection(clientConnection),)if err != nil {log.Fatalf("Failed to connect to Gateway: %v", err)}defer gw.Close()// 获取网络和合约对象network := gw.GetNetwork(channelName)contract := network.GetContract(chaincodeName)// memberNameToAdd := "node-a"// // 4. 调用 AddMember 提交事务// log.Printf("\n--> 提交 AddMember 事务以添加 '%s'...", memberNameToAdd)// _, err = contract.SubmitTransaction("AddMember", memberNameToAdd)// if err != nil {// log.Fatalf("Failed to submit AddMember transaction: %v", err)// }// log.Printf("成员 '%s' 添加成功。", memberNameToAdd)// memberNameToAdd2 := "node-b"// // 4. 调用 AddMember 提交事务// log.Printf("\n--> 提交 AddMember 事务以添加 '%s'...", memberNameToAdd2)// _, err = contract.SubmitTransaction("AddMember", memberNameToAdd2)// if err != nil {// log.Fatalf("Failed to submit AddMember transaction: %v", err)// }// log.Printf("成员 '%s' 添加成功。", memberNameToAdd2)// 5. 调用 GetMembers 查询当前成员列表log.Println("\n--> 查询所有成员...")result, err := contract.EvaluateTransaction("GetMembers")if err != nil {log.Fatalf("Failed to evaluate GetMembers transaction: %v", err)}log.Printf("当前成员列表:\n%s", formatJSON(result))// // 6. 调用 RemoveMember 提交事务// log.Printf("\n--> 提交 RemoveMember 事务以移除 '%s'...", memberNameToAdd)// _, err = contract.SubmitTransaction("RemoveMember", memberNameToAdd)// if err != nil {// log.Fatalf("Failed to submit RemoveMember transaction: %v", err)// }// log.Printf("成员 '%s' 移除成功。", memberNameToAdd)// // 7. 再次调用 GetMembers 查询以确认移除// log.Println("\n--> 再次查询所有成员以确认移除...")// result, err = contract.EvaluateTransaction("GetMembers")// if err != nil {// log.Fatalf("Failed to evaluate GetMembers transaction: %v", err)// }// log.Printf("移除后成员列表:\n%s", formatJSON(result))// log.Println("\n所有操作已完成。")
}