当前位置: 首页 > java >正文

基于 Skynet Cluster 模式的完整案例

在这里插入图片描述

目录

  • 1. 目录结构
  • 2. 集群配置文件 (clustermap.lua)
  • 3. 节点配置
    • config_db (数据库节点)
    • config_gate (网关节点)
  • 4. 服务代码
    • simpledb.lua (数据库服务)
  • 5. 节点入口脚本
    • db.lua (数据库节点)
    • gates.lua (网关节点)
  • 6. 运行步骤
    • (1) 启动数据库节点
    • (2) 启动网关节点
  • 7. 关键机制说明
    • 1. 服务发现
    • 2. 动态配置
    • 3. 错误处理
    • 4. 消息可靠性
  • 8. 高级功能扩展
    • 1. 节点监控
    • 2. 负载均衡
    • 3. 跨机房部署
  • 9. 故障模拟测试
    • 场景 1:数据库节点宕机
    • 场景 2:网络分区
  • 10. 总结
    • Cluster 模式核心优势
    • 适用场景
    • 性能建议

以下是一个基于 Skynet Cluster 模式的完整案例,演示跨节点服务注册、调用和动态配置管理:


1. 目录结构

skynet_cluster/
├── config_db          # 数据库节点配置
├── config_gate        # 网关节点配置
├── clustermap.lua     # 集群节点配置文件
├── gate.lua           # 网关节点入口
├── db.lua             # 数据库节点入口
└── simpledb.lua       # 数据库服务代码

2. 集群配置文件 (clustermap.lua)

-- 定义集群节点地址
return {gate = "127.0.0.1:7101",  -- 网关节点db   = "127.0.0.1:7102",  -- 数据库节点-- 可动态添加新节点,例如:-- cache = "192.168.1.100:7103",
}

3. 节点配置

config_db (数据库节点)

thread = 4
logger = nil
harbor = 0
start = "db"
bootstrap = "snlua bootstrap"	-- The service for bootstrap
lualoader = "./lualib/loader.lua"luaservice = "./service/?.lua;./myservice/?.lua;"
lua_path = "./lualib/?.lua;"
lua_cpath = "./luaclib/?.so;"
cpath = "./cservice/?.so"cluster = "./myservice/clustermap.lua"

config_gate (网关节点)

thread = 4
logger = nil
harbor = 0
start = "gates"
bootstrap = "snlua bootstrap"	-- The service for bootstrap
lualoader = "./lualib/loader.lua"luaservice = "./service/?.lua;./myservice/?.lua;"
lua_path = "./lualib/?.lua;"
lua_cpath = "./luaclib/?.so;"
cpath = "./cservice/?.so"cluster = "./myservice/clustermap.lua"

4. 服务代码

simpledb.lua (数据库服务)

local skynet = require "skynet"
require "skynet.manager"	-- import skynet.register
local db = {}local command = {}function command.GET(key)return db[key]
endfunction command.SET(key, value)local last = db[key]db[key] = valuereturn last
endskynet.start(function()skynet.dispatch("lua", function(session, address, cmd, ...)cmd = cmd:upper()if cmd == "PING" thenassert(session == 0)local str = (...)if #str > 20 thenstr = str:sub(1,20) .. "...(" .. #str .. ")"endskynet.error(string.format("%s ping %s", skynet.address(address), str))returnendlocal f = command[cmd]if f thenskynet.ret(skynet.pack(f(...)))elseerror(string.format("Unknown command %s", tostring(cmd)))endend)
--	skynet.traceproto("lua", false)	-- true off tracelogskynet.register "SIMPLEDB"
end)

5. 节点入口脚本

db.lua (数据库节点)

local skynet = require "skynet"
local cluster = require "skynet.cluster"skynet.start(function()-- 启动数据库服务local sdb = skynet.newservice("simpledb")cluster.register("sdb",sdb)print(skynet.call(sdb,"lua","SET","a","showsss"))print(skynet.call(sdb,"lua","GET","a"))-- 打开数据库节点监听cluster.open("db")  -- 对应 clustermap 中的 db 配置skynet.error("Database node ready")
end)

gates.lua (网关节点)

local skynet = require "skynet"
local cluster = require "skynet.cluster"skynet.start(function()-- 打开网关节点监听cluster.open("gate")  -- 对应 clustermap 中的 gate 配置-- 获取数据库服务代理local dbproxy = cluster.proxy("db", "@sdb")skynet.error("proxy",dbproxy)-- 测试跨节点调用skynet.call(dbproxy, "lua", "SET", "player_1001", '{name="Alice", level=10}')local data = skynet.call(dbproxy, "lua", "GET", "player_1001")skynet.error(data)skynet.fork(function()-- skynet.trace("cluster")skynet.error(cluster.call("db", "@sdb", "GET", "player_1001"))end)-- 动态添加新节点示例skynet.fork(function()skynet.sleep(100)cluster.reload({  -- 动态更新集群配置-- gate = "127.0.0.1:7101",    -- 网关节点db   = false,    -- 数据库节点db1 = "127.0.0.1:7103",   -- 新增节点})local sdb1 = skynet.newservice("simpledb")cluster.register("sdb1",sdb1)skynet.error("Cluster config updated!")local dbproxy = cluster.proxy("gate", "@sdb1")cluster.call("gate", "@sdb1", "SET", "player_1001","hishow")skynet.error(cluster.call("gate", "@sdb1", "GET", "player_1001"))end)
end)

6. 运行步骤

(1) 启动数据库节点

./skynet/skynet skynet_cluster/config_db

(2) 启动网关节点

./skynet/skynet skynet_cluster/config_gate

7. 关键机制说明

1. 服务发现

  • 注册服务cluster.register("@sdb") 在数据库节点注册集群可见服务
  • 获取代理cluster.proxy("db", "@sdb") 获取远程服务代理
  • 直接调用cluster.call("db", "@sdb", "GET", key)

2. 动态配置

-- 动态添加新节点
cluster.reload({  -- 动态更新集群配置-- gate = "127.0.0.1:7101",    -- 网关节点db   = false,    -- 数据库节点db1 = "127.0.0.1:7103",   -- 新增节点
})
  • 所有节点需同步更新配置(实际项目需要自行实现配置同步机制)

3. 错误处理

local ok, res = pcall(cluster.call, "db", "@sdb", "GET", "invalid_key")
if not ok thenskynet.error("DB query failed:", res)
end

4. 消息可靠性

  • cluster.call:自动重试直到 TCP 连接恢复
  • cluster.send:不保证送达,适合心跳包等非关键数据

8. 高级功能扩展

1. 节点监控

-- 监控 db 节点状态
skynet.fork(function()while true dolocal ok = pcall(cluster.call, "db", "@sdb", "PING")if not ok thenskynet.error("DB node unreachable!")-- 触发故障转移逻辑endskynet.sleep(200)  -- 每 2 秒检查一次end
end)

2. 负载均衡

-- 随机选择节点
local function get_db_node()local nodes = {"db", "db_backup1", "db_backup2"}return nodes[math.random(1, #nodes)]
endlocal data = cluster.call(get_db_node(), "@sdb", "GET", key)

3. 跨机房部署

-- clustermap.lua
return {gate_beijing = "10.1.1.100:7101",db_shanghai  = "10.2.1.100:7102",cache_guangzhou = "10.3.1.100:7103",
}

9. 故障模拟测试

场景 1:数据库节点宕机

  1. 手动停止数据库节点
  2. 观察网关节点日志:
    [:0000000a] DB node unreachable!
    [:0000000a] Switch to backup node
    
  3. 新请求自动切换到备用节点

场景 2:网络分区

  1. 使用防火墙阻断网关到数据库的端口
    iptables -A INPUT -p tcp --dport 7102 -j DROP
    
  2. 观察网关的 pcall 错误捕获机制
  3. 恢复网络后通信自动恢复

10. 总结

Cluster 模式核心优势

  1. 去中心化架构:无单点故障
  2. 跨物理机支持:天然适应分布式部署
  3. 动态配置管理cluster.reload 实现线上热更新
  4. 细粒度控制:支持多种通信语义(call/send/proxy)

适用场景

  • MMO 游戏服务器:网关/场景/数据库分层部署
  • 微服务架构:订单服务、用户服务独立节点
  • IoT 系统:多个地域的边缘计算节点

性能建议

  • 单个 Cluster 通道支持 10K+ QPS,如需更高吞吐:
    1. 为重要服务建立专用通道
    2. 使用 cluster.open("node", {channel=2}) 开启多通道
    3. 业务层做分流:channel = key % N + 1

完整代码示例 可在 Skynet 官方仓库获取。

http://www.xdnf.cn/news/1469.html

相关文章:

  • 【TUST“码蹄杯”编程之星】4.23 每日一题
  • Spring Boot 请求参数接收控制指南
  • 有源晶振波形特性与测量分析
  • 【Deepseek学习大模型推理】MOONCAKE: A KVCache-centric Architecture 第一部分引言部分
  • Java 异常 SSLException: fatal alert: protocol_version 全解析与解决方案
  • 多智能体系统的中间件架构
  • 爬虫学习总结
  • 02.Python代码Pandas - Series全系列分享(使用.特点.说明.取值.函数)
  • AIGC vs 人类创作者:是竞争还是协作?
  • Python基础语法3
  • 模型量化核心技术解析:从算法原理到工业级实践
  • ActiveMQ 核心概念与消息模型详解(一)
  • 巴西快手kwai短视频广告代投游戏出海营销攻略
  • 基于SpringBoot的校园二手商品在线交易系统+含项目运行说明文档
  • SpringCloud 微服务复习笔记
  • 【k8s】LVS/IPVS的三种模式:NAT、DR、TUN
  • 从白平衡色温坐标系调整的角度消除硬件不一致性引起的偏色问题
  • springmvc入门案例
  • 【高频考点精讲】JavaScript中的访问者模式:从AST解析到数据转换的艺术
  • 游戏引擎学习第240天:将渲染器移至第三层
  • Android SDK 下载及配置 --- app笔记
  • rabbitmq-spring-boot-start版本优化升级
  • 算力租赁:重构数字经济的基础设施革命
  • 线程入门3
  • 格雷希尔气瓶充装连接器:广泛应用于工业气体充注站的快速充装,及气瓶生产厂家的气密性测试
  • 从Nacos derby RCE学习derby数据库的利用
  • 【源码分析】Linux内核ov13850.c
  • [HCTF 2018]WarmUp
  • ospf综合练习
  • 【编译原理】第三章 习题