当前位置: 首页 > news >正文

企业级架构师综合能力项目案例一(各种组件集群搭建+SpringBoot整合)

架构图

用户请求 → Nginx → Spring Cloud Gateway → 微服务集群↓MySQL集群主从复制(ShardingSphere)   Redis集群主从复制(Sentinel)ES集群                     MongoDB集群(分片)RocketMQ集群               Seata分布式事务

搭建集群

Nginx集群和配置

                             ┌─────────────────┐│  云防火墙       ││  (安全组)       │└────────┬────────┘│┌─────────────────┐│  云负载均衡器    ││  (可选)         │└────────┬────────┘│▼┌───────────────────────────────────┐│           浮动VIP: 192.168.1.100  │└───────────────────────────────────┘│┌────────────────┼─────────────────┐▼                ▼                 ▼┌───────────────┐ ┌───────────────┐ ┌───────────────┐│  Nginx主节点  │ │  Nginx备节点  │ │  Nginx节点3   ││  192.168.1.10 │ │  192.168.1.11 │ │  192.168.1.12 ││  (防火墙)     │ │  (防火墙)     │ │  (防火墙)     │└───────────────┘ └───────────────┘ └───────────────┘│                │                 │└────────────────┼─────────────────┘▼┌─────────────────────┐│     应用服务器集群    ││  (防火墙)           ││  ┌───┐  ┌───┐  ┌───┐  ││  │App│  │App│  │App│  ││  │Server│Server│Server│ ││  └───┘  └───┘  └───┘  ││    192.168.1.20+      │└─────────────────────┘                        

①、环境准备

服务器规划

  • Nginx节点:3台(最少2台实现高可用)
  • 应用服务器:至少2台
  • 操作系统:CentOS 8 / Ubuntu 20.04 LTS

软件要求

  • Nginx 1.18+
  • Keepalived
  • Fail2ban(安全防护)
  • Certbot(SSL证书管理)
  • 后端应用(如Tomcat、Node.js等)

②、安装Nginx

在所有Nginx节点上执行:

# 更新系统
sudo apt update && sudo apt upgrade -y  # Ubuntu
# 或
sudo yum update -y  # CentOS# 创建管理用户
sudo useradd -m -s /bin/bash admin
sudo passwd admin
sudo usermod -aG sudo admin  # Ubuntu
# 或
sudo usermod -aG wheel admin  # CentOS# 禁用root SSH登录
sudo sed -i 's/^PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config
sudo systemctl restart sshd# 配置防火墙
sudo ufw enable  # Ubuntu
sudo ufw default deny incoming
sudo ufw default allow outgoing
sudo ufw allow from 192.168.1.0/24 to any port 22  # SSH内网访问
sudo ufw allow from 192.168.1.0/24 to any port 80  # HTTP内网访问
sudo ufw allow from 192.168.1.0/24 to any port 443 # HTTPS内网访问# CentOS使用firewalld
sudo systemctl enable firewalld
sudo systemctl start firewalld
sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.1.0/24" port port="22" protocol="tcp" accept'
sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.1.0/24" port port="80" protocol="tcp" accept'
sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.1.0/24" port port="443" protocol="tcp" accept'
sudo firewall-cmd --reload# 安装Fail2ban(防暴力破解)
sudo apt install fail2ban -y  # Ubuntu
# 或
sudo yum install epel-release && sudo yum install fail2ban -y  # CentOS# 配置Fail2ban
sudo cp /etc/fail2ban/jail.conf /etc/fail2ban/jail.local
sudo tee /etc/fail2ban/jail.d/sshd.local > /dev/null <<EOF
[sshd]
enabled = true
port = ssh
logpath = %(sshd_log)s
backend = %(sshd_backend)s
maxretry = 3
bantime = 3600
findtime = 600
EOFsudo systemctl enable fail2ban
sudo systemctl start fail2ban
# 安装Nginx
# Ubuntu
sudo apt install nginx -y# CentOS
sudo yum install epel-release
sudo yum install nginx -y# 创建Nginx运行用户和组
sudo groupadd nginxuser
sudo useradd -g nginxuser -s /bin/false nginxuser# 备份原始配置
sudo cp /etc/nginx/nginx.conf /etc/nginx/nginx.conf.backup

③、配置Nginx负载均衡

编辑所有Nginx节点的配置文件 /etc/nginx/nginx.conf

# 运行用户和组(使用非特权用户)
user nginxuser nginxuser;# 工作进程数(根据CPU核心数调整)
worker_processes auto;# 错误日志
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;events {worker_connections 1024;use epoll;multi_accept on;
}http {# 安全相关头部add_header X-Frame-Options DENY always;add_header X-Content-Type-Options nosniff always;add_header X-XSS-Protection "1; mode=block" always;add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;# 基础配置sendfile on;tcp_nopush on;tcp_nodelay on;keepalive_timeout 65;types_hash_max_size 2048;server_tokens off;  # 隐藏Nginx版本号include /etc/nginx/mime.types;default_type application/octet-stream;# 日志格式log_format main '$remote_addr - $remote_user [$time_local] "$request" ''$status $body_bytes_sent "$http_referer" ''"$http_user_agent" "$http_x_forwarded_for"';access_log /var/nginx/access.log main;# 限制请求大小和缓冲区client_max_body_size 10m;client_body_buffer_size 128k;client_header_buffer_size 1k;large_client_header_buffers 4 4k;# 连接限制limit_conn_zone $binary_remote_addr zone=perip:10m;limit_conn_zone $server_name zone=perserver:10m;upstream backend {# 负载均衡策略least_conn;# 后端服务器列表server 192.168.1.20:8080 weight=3 max_fails=3 fail_timeout=30s;server 192.168.1.21:8080 weight=2 max_fails=3 fail_timeout=30s;server 192.168.1.22:8080 weight=1 max_fails=3 fail_timeout=30s;# 保持连接keepalive 32;}server {listen 80;server_name _;# 连接限制limit_conn perip 10;limit_conn perserver 100;# 静态文件缓存location ~* \.(jpg|jpeg|png|gif|ico|css|js)$ {expires 30d;add_header Cache-Control "public, immutable";}location / {# 基本的访问控制deny 192.168.1.99;  # 示例:拒绝特定IPproxy_pass http://backend;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 安全代理设置proxy_hide_header X-Powered-By;proxy_hide_header Server;# 超时设置proxy_connect_timeout 5s;proxy_send_timeout 10s;proxy_read_timeout 10s;# 健康检查proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;}# Nginx状态页面(限制访问)location /nginx_status {stub_status on;access_log off;allow 192.168.1.0/24;deny all;auth_basic "Restricted Access";auth_basic_user_file /etc/nginx/htpasswd;}# 禁止访问隐藏文件location ~ /\. {deny all;access_log off;log_not_found off;}# 安全监控端点location /health {access_log off;return 200 "healthy\n";add_header Content-Type text/plain;}}# 限制特定User-Agentmap $http_user_agent $blocked_agent {default 0;~*malicious 1;~*bot 1;~*spider 1;~*curl 1;~*wget 1;}
}

创建状态页面的认证文件

sudo sh -c "echo -n 'admin:' >> /etc/nginx/htpasswd"
sudo sh -c "openssl passwd -apr1 >> /etc/nginx/htpasswd"
# 输入密码

SSL/TLS配置(可选但推荐)

# 安装Certbot
sudo apt install certbot python3-certbot-nginx -y  # Ubuntu
# 或
sudo yum install certbot python3-certbot-nginx -y  # CentOS# 获取证书(如果有域名)
# sudo certbot --nginx -d your-domain.com# 或者创建自签名证书用于测试
sudo mkdir -p /etc/ssl/certs/nginx
sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 \-keyout /etc/ssl/certs/nginx/nginx.key \-out /etc/ssl/certs/nginx/nginx.crt \-subj "/C=CN/ST=Beijing/L=Beijing/O=Company/CN=nginx-cluster"

④、安装配置Keepalived实现高可用

在主备Nginx节点上安装Keepalived:

sudo apt install keepalived -y  # Ubuntu
# 或
sudo yum install keepalived -y  # CentOS

主节点Keepalived配置 (/etc/keepalived/keepalived.conf):

global_defs {router_id nginx_masterenable_script_security
}vrrp_script chk_nginx {script "/usr/bin/killall -0 nginx"interval 2weight -5fall 2rise 1timeout 2
}vrrp_instance VI_1 {state MASTERinterface eth0virtual_router_id 51priority 101advert_int 1authentication {auth_type PASSauth_pass secure_password_123  # 使用强密码}virtual_ipaddress {192.168.1.100/24 dev eth0}track_script {chk_nginx}# 安全设置garp_master_delay 1garp_master_refresh 5
}

备节点Keepalived配置:

global_defs {router_id nginx_backup
}vrrp_script chk_nginx {script "/usr/bin/killall -0 nginx"interval 2weight -5fall 2rise 1
}vrrp_instance VI_1 {state BACKUPinterface eth0virtual_router_id 51priority 100advert_int 1authentication {auth_type PASSauth_pass 1111}virtual_ipaddress {192.168.1.100/24 dev eth0}track_script {chk_nginx}
}

配置系统服务和权限

# 设置文件和目录权限
sudo chown -R nginxuser:nginxuser /var/log/nginx
sudo chmod -R 755 /var/log/nginx
sudo chmod 640 /etc/nginx/nginx.conf
sudo chmod 600 /etc/ssl/certs/nginx/nginx.key# 创建系统服务检查脚本
sudo tee /usr/local/bin/check_nginx.sh > /dev/null <<'EOF'
#!/bin/bash
if ! pgrep -x "nginx" > /dev/null; thensystemctl start nginxsleep 2if ! pgrep -x "nginx" > /dev/null; thenexit 1fi
fi
exit 0
EOFsudo chmod +x /usr/local/bin/check_nginx.sh# 配置日志轮转
sudo tee /etc/logrotate.d/nginx > /dev/null <<EOF
/var/log/nginx/*.log {dailymissingokrotate 30compressdelaycompressnotifemptycreate 640 nginxuser nginxusersharedscriptspostrotateif [ -f /var/run/nginx.pid ]; thenkill -USR1 \$(cat /var/run/nginx.pid)fiendscript
}
EOF

⑤、启动服务

# 启动Nginx
sudo systemctl enable nginx
sudo systemctl start nginx# 启动Keepalived
sudo systemctl enable keepalived
sudo systemctl start keepalived# 验证服务状态
sudo systemctl status nginx
sudo systemctl status keepalived
sudo systemctl status fail2ban# 检查防火墙规则
sudo ufw status verbose  # Ubuntu
# 或
sudo firewall-cmd --list-all  # CentOS

⑥、配置后端应用服务器

在各应用服务器上部署应用并确保服务运行在指定端口。

# 配置应用服务器防火墙
sudo ufw allow from 192.168.1.0/24 to any port 8080  # 应用端口
sudo ufw allow from 192.168.1.0/24 to any port 22    # SSH# 配置应用层面的安全
# 例如:设置适当的CORS头、输入验证、输出编码等

⑦、测试

基础功能测试

# 测试负载均衡
for i in {1..10}; docurl http://192.168.1.100/
done# 检查响应是否来自不同后端服务器

安全测试

# 1. 端口扫描测试
nmap -sS 192.168.1.10  # 检查开放端口# 2. SSL/TLS测试(如果有)
openssl s_client -connect 192.168.1.100:443 -servername your-domain.com# 3. 安全头部测试
curl -I http://192.168.1.100/# 4. 暴力破解防护测试
# 尝试多次错误SSH登录,检查是否被ban
for i in {1..5}; do ssh wronguser@192.168.1.10; done
sudo fail2ban-client status sshd  # 检查是否被封禁# 5. DDoS防护测试(在测试环境)
# 使用slowloris等工具测试连接数限制# 6. 文件权限检查
namei -l /etc/nginx/nginx.conf
namei -l /etc/ssl/certs/nginx/nginx.key# 7. 配置语法检查
sudo nginx -t

高可用性测试

# 查看当前VIP所在节点
ip addr show eth0 | grep "192.168.1.100"# 手动停止主节点Nginx服务
ssh nginx-master "systemctl stop nginx"# 检查VIP是否漂移到备节点
ip addr show eth0 | grep "192.168.1.100"  # 应在备节点上看到VIP# 恢复主节点服务并检查VIP是否漂回
ssh nginx-master "systemctl start nginx"
sleep 10
ip addr show eth0 | grep "192.168.1.100"  # 应漂回主节点

性能与压力测试

# 使用ab进行压力测试
ab -n 10000 -c 100 http://192.168.1.100/# 使用wrk进行更高级测试
wrk -t12 -c400 -d30s http://192.168.1.100/

健康检查测试

# 停止一台后端服务器
ssh app-server-1 "systemctl stop tomcat"# 检查Nginx是否自动将流量转移到其他服务器
curl http://192.168.1.100/# 查看Nginx状态页面
curl http://192.168.1.10/nginx_status

日志监控检查

# 检查Nginx访问日志
tail -f /var/log/nginx/access.log# 检查Nginx错误日志
tail -f /var/log/nginx/error.log# 检查Keepalived日志
tail -f /var/log/messages | grep keepalived

或者如下:

# 1. 负载均衡测试
for i in {1..20}; docurl -s http://192.168.1.100/ | grep "Server IP"
done# 2. 高可用性测试
# 停止主节点Nginx,检查VIP漂移
ssh nginx-master "sudo systemctl stop nginx"
sleep 5
curl http://192.168.1.100/health  # 应该仍然可以访问# 3. 健康检查测试
# 停止一台后端服务器,检查流量转移
ssh app-server-1 "sudo systemctl stop tomcat"
for i in {1..10}; do curl http://192.168.1.100/; done# 4. 性能压力测试
ab -n 1000 -c 50 http://192.168.1.100/
wrk -t4 -c100 -d30s http://192.168.1.100/

日常脚本维护

#!/bin/bash
# nginx_cluster_check.shVIP="192.168.1.100"
NGINX_NODES=("192.168.1.10" "192.168.1.11" "192.168.1.12")
APP_NODES=("192.168.1.20" "192.168.1.21" "192.168.1.22")echo "=== Nginx集群状态检查 ==="
echo "时间: $(date)"echo -e "\n1. 检查VIP状态:"
for node in "${NGINX_NODES[@]}"; dossh $node "ip a | grep $VIP && echo \"VIP存在于: $node\""
doneecho -e "\n2. 检查Nginx进程:"
for node in "${NGINX_NODES[@]}"; dostatus=$(ssh $node "systemctl is-active nginx")echo "Nginx在 $node 的状态: $status"
doneecho -e "\n3. 检查后端应用:"
for node in "${APP_NODES[@]}"; doif ssh $node "curl -s http://localhost:8080/health > /dev/null"; thenecho "应用在 $node: 健康"elseecho "应用在 $node: 异常"fi
done

创建安全监控脚本:

sudo tee /usr/local/bin/security_monitor.sh > /dev/null <<'EOF'
#!/bin/bash
echo "=== 安全状态检查 ==="
echo "时间: $(date)"
echo ""echo "1. 失败登录尝试:"
sudo grep "Failed password" /var/log/auth.log | tail -5  # Ubuntu
# 或 sudo grep "Failed password" /var/log/secure | tail -5  # CentOSecho ""
echo "2. Fail2ban状态:"
sudo fail2ban-client statusecho ""
echo "3. 异常连接:"
sudo netstat -tunap | grep ESTAB | awk '{print $5}' | cut -d: -f1 | sort | uniq -c | sort -n | tail -5echo ""
echo "4. 系统资源:"
free -h && echo "" && df -hecho ""
echo "5. 服务状态:"
systemctl status nginx | grep Active
systemctl status keepalived | grep Active
systemctl status fail2ban | grep Active
EOFsudo chmod +x /usr/local/bin/security_monitor.sh

MySQL集群(主从复制)+ShardingSphere

┌─────────────────────────────────────────────────────────────┐
│                   应用层                                    │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                ShardingSphere-Proxy                 │   │
│  │              (负载均衡集群: 3节点)                   │   │
│  └─────────────────────────┬───────────────────────────┘   │
│                            │                               │
└────────────────────────────┼───────────────────────────────┘│
┌────────────────────────────┼───────────────────────────────┐
│                   数据访问层                               │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                ShardingSphere-Proxy                 │   │
│  │              (VIP: 192.168.2.100)                   │   │
│  └─────────────────────────┬───────────────────────────┘   │
│                            │                               │
└────────────────────────────┼───────────────────────────────┘│
┌────────────────────────────┼───────────────────────────────┐
│                   数据存储层 (多主多从)                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ MySQL主节点1│  │ MySQL主节点2│  │ MySQL主节点3│         │
│  │ (可写)      │  │ (可写)      │  │ (可写)      │         │
│  │ 3306端口    │  │ 3306端口    │  │ 3306端口    │         │
│  └─────┬───────┘  └─────┬───────┘  └─────┬───────┘         │
│        │                │                │                 │
│        ▼                ▼                ▼                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ MySQL从节点1│  │ MySQL从节点2│  │ MySQL从节点3│         │
│  │ (只读)      │  │ (只读)      │  │ (只读)      │         │
│  │ 3306端口    │  │ 3306端口    │  │ 3306端口    │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                            │
│                 多主环形复制 + GTID                        │
└────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────┐
│                   监控管理层                               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Prometheus │  │   Grafana   │  │   Orchestrator│       │
│  │             │  │             │  │  (拓扑管理)  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
└────────────────────────────────────────────────────────────┘

①、服务器环境规划

MySQL主节点:3台 (192.168.2.10, 192.168.2.11, 192.168.2.12)
MySQL从节点:3台 (192.168.2.13, 192.168.2.14, 192.168.2.15)

ShardingSphere-Proxy节点:3台 (192.168.2.20, 192.168.2.21, 192.168.2.22)
监控节点:1台 (192.168.2.30)

②、配置MySQL集群

在所有MySQL节点上(主从节点)执行:

# 安装MySQL 8.0
sudo apt install mysql-server -y# 配置MySQL多主复制
sudo tee /etc/mysql/mysql.conf.d/multi-master.cnf << 'EOF'
[mysqld]
# 服务器标识(每个节点唯一)
server-id = 1  # 修改为不同值:1,2,3,4,5,6# 复制配置
log_bin = /var/log/mysql/mysql-bin.log
relay_log = /var/log/mysql/mysql-relay-bin.log
binlog_format = ROW
expire_logs_days = 10
max_binlog_size = 100M# 多主复制配置
auto_increment_increment = 6  # 节点数量
auto_increment_offset = 1     # 每个节点不同:1,2,3,4,5,6# GTID配置
gtid_mode = ON
enforce_gtid_consistency = ON
log_slave_updates = ON# 性能配置
innodb_buffer_pool_size = 2G
innodb_log_file_size = 512M
max_connections = 1000# 网络配置
bind-address = 0.0.0.0# 字符集
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci# 多主复制允许循环复制
log_slave_updates = 1
slave_parallel_workers = 4
slave_parallel_type = LOGICAL_CLOCK# 从节点配置(只在从节点上取消注释)
# read_only = 1
# super_read_only = 1
EOF# 重启MySQL
sudo systemctl restart mysql

③、配置环形复制拓扑

在所有主节点上配置复制:

# 创建复制用户
sudo mysql -e "
CREATE USER 'repl'@'%' IDENTIFIED WITH mysql_native_password BY 'Repl123!@#';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;CREATE USER 'sharding'@'%' IDENTIFIED WITH mysql_native_password BY 'Sharding123!@#';
GRANT ALL PRIVILEGES ON *.* TO 'sharding'@'%';
FLUSH PRIVILEGES;
"# 配置环形复制
# 节点1 -> 节点2 -> 节点3 -> 节点1# 在节点1上配置
sudo mysql -e "
STOP SLAVE;
CHANGE MASTER TO
MASTER_HOST='192.168.2.12',
MASTER_USER='repl',
MASTER_PASSWORD='Repl123!@#',
MASTER_AUTO_POSITION=1;
START SLAVE;
"# 在节点2上配置  
sudo mysql -e "
STOP SLAVE;
CHANGE MASTER TO
MASTER_HOST='192.168.2.10',
MASTER_USER='repl',
MASTER_PASSWORD='Repl123!@#',
MASTER_AUTO_POSITION=1;
START SLAVE;
"# 在节点3上配置
sudo mysql -e "
STOP SLAVE;
CHANGE MASTER TO
MASTER_HOST='192.168.2.11',
MASTER_USER='repl',
MASTER_PASSWORD='Repl123!@#',
MASTER_AUTO_POSITION=1;
START SLAVE;
"# 配置从节点复制
# 从节点1复制主节点1,以此类推
sudo mysql -e "
STOP SLAVE;
CHANGE MASTER TO
MASTER_HOST='192.168.2.10',
MASTER_USER='repl',
MASTER_PASSWORD='Repl123!@#',
MASTER_AUTO_POSITION=1;
START SLAVE;
"

④、配置ShardingSphere多数据源

在ShardingSphere-Proxy节点上:

# 配置多主数据源
sudo tee conf/config-sharding.yaml << 'EOF'
databaseName: sharding_dbdataSources:master0:url: jdbc:mysql://192.168.2.10:3306/sharding_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#connectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1master1:url: jdbc:mysql://192.168.2.11:3306/sharding_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#connectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1master2:url: jdbc:mysql://192.168.2.12:3306/sharding_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#connectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1slave0:url: jdbc:mysql://192.168.2.13:3306/sharding_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#connectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1slave1:url: jdbc:mysql://192.168.2.14:3306/sharding_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#connectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1slave2:url: jdbc:mysql://192.168.2.15:3306/sharding_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#connectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1rules:
- !LOAD_BALANCEdataSources:write_ds:dataSourceNames: master0,master1,master2loadBalancerName: round_robinread_ds:dataSourceNames: slave0,slave1,slave2loadBalancerName: round_robinloadBalancers:round_robin:type: ROUND_ROBIN- !READWRITE_SPLITTINGdataSources:primary_ds:writeDataSourceName: write_dsreadDataSourceNames:- read_dsloadBalancerName: round_robinloadBalancers:round_robin:type: ROUND_ROBIN- !SHARDINGtables:user:actualDataNodes: primary_ds.user_${0..1}tableStrategy:standard:shardingColumn: user_idshardingAlgorithmName: user_inlinekeyGenerateStrategy:column: user_idkeyGeneratorName: snowflakeorder:actualDataNodes: primary_ds.order_${0..3}tableStrategy:standard:shardingColumn: order_idshardingAlgorithmName: order_inlinekeyGenerateStrategy:column: order_idkeyGeneratorName: snowflakeshardingAlgorithms:user_inline:type: INLINEprops:algorithm-expression: user_${user_id % 2}order_inline:type: INLINEprops:algorithm-expression: order_${order_id % 4}keyGenerators:snowflake:type: SNOWFLAKE
EOF

⑤、配置Orchestrator进行自动故障转移

# 安装Orchestrator
wget https://github.com/openark/orchestrator/releases/download/v3.2.6/orchestrator-3.2.6-linux-amd64.tar.gz
tar -xzf orchestrator-*.tar.gz# 配置Orchestrator
sudo tee orchestrator.conf.json << 'EOF'
{"Debug": true,"EnableSyslog": false,"ListenAddress": ":3000","MySQLTopologyUser": "orchestrator","MySQLTopologyPassword": "Orchestrator123!@#","MySQLTopologyCredentialsConfigFile": "","MySQLTopologySSLPrivateKeyFile": "","MySQLTopologySSLCertFile": "","MySQLTopologySSLCAFile": "","MySQLTopologySSLSkipVerify": true,"MySQLTopologyUseMutualTLS": false,"MySQLOrchestratorHost": "192.168.2.30","MySQLOrchestratorPort": 3306,"MySQLOrchestratorDatabase": "orchestrator","MySQLOrchestratorUser": "orchestrator","MySQLOrchestratorPassword": "Orchestrator123!@#","MySQLConnectTimeoutSeconds": 1,"DefaultInstancePort": 3306,"DiscoverByShowSlaveHosts": true,"InstancePollSeconds": 5,"UnseenInstanceForgetHours": 240,"SnapshotTopologiesIntervalHours": 0,"InstanceBulkOperationsWaitTimeoutSeconds": 10,"HostnameResolveMethod": "default","MySQLHostnameResolveMethod": "@@hostname","DetectClusterAliasQuery": "","DetectClusterDomainQuery": "","DetectInstanceAliasQuery": "","DetectDataCenterQuery": "","DetectPhysicalEnvironmentQuery": "","DetectSemiSyncEnforcedQuery": "","SupportFuzzyPoolHostnames": true,"RecoveryPeriodBlockSeconds": 3600,"RecoveryIgnoreHostnameFilters": [],"RecoverMasterClusterFilters": ["*"],"RecoverIntermediateMasterClusterFilters": ["*"],"PromotionIgnoreHostnameFilters": [],"DetachLostSlavesAfterMasterFailover": true,"ApplyMySQLPromotionAfterMasterFailover": true,"MasterFailoverLostInstancesDowntimeMinutes": 0,"PreventCrossDataCenterMasterFailover": false,"PreventCrossRegionMasterFailover": false,"DelayMasterPromotionIfSQLThreadNotUpToDate": false,"MasterFailoverDetachSlaveMasterHost": false,"MasterFailoverMustPromoteOtherMaster": true,"CoMasterRecoveryMustPromoteOtherCoMaster": true
}
EOF

⑥、测试

多主写入测试

# 同时在多个主节点写入数据
mysql -u sharding -pSharding123!@# -h 192.168.2.10 -e "
INSERT INTO sharding_db.user (user_id, username, email) VALUES 
(1, 'user1_from_master1', 'user1@example.com');
"mysql -u sharding -pSharding123!@# -h 192.168.2.11 -e "
INSERT INTO sharding_db.user (user_id, username, email) VALUES 
(2, 'user2_from_master2', 'user2@example.com');
"mysql -u sharding -pSharding123!@# -h 192.168.2.12 -e "
INSERT INTO sharding_db.user (user_id, username, email) VALUES 
(3, 'user3_from_master3', 'user3@example.com');
"# 检查数据同步
mysql -u sharding -pSharding123!@# -h 192.168.2.10 -e "SELECT * FROM sharding_db.user ORDER BY user_id;"
mysql -u sharding -pSharding123!@# -h 192.168.2.11 -e "SELECT * FROM sharding_db.user ORDER BY user_id;"
mysql -u sharding -pSharding123!@# -h 192.168.2.12 -e "SELECT * FROM sharding_db.user ORDER BY user_id;"

主节点故障转移测试

# 停止一个主节点
sudo systemctl stop mysql  # 在192.168.2.10上# 通过ShardingSphere继续写入(应该自动路由到其他主节点)
mysql -u root -proot -h 192.168.2.20 -P 3307 -e "
INSERT INTO user (user_id, username, email) VALUES 
(4, 'user4_after_failover', 'user4@example.com');
SELECT * FROM user ORDER BY user_id;
"# 检查Orchestrator是否检测到故障
curl http://192.168.2.30:3000/api/instances | jq '.[] | select(.ServerID == 1)'# 恢复节点并检查数据同步
sudo systemctl start mysql  # 在192.168.2.10上
mysql -u sharding -pSharding123!@# -h 192.168.2.10 -e "SELECT * FROM sharding_db.user ORDER BY user_id;"

负载均衡测试

# 测试写操作负载均衡
for i in {1..20}; domysql -u root -proot -h 192.168.2.20 -P 3307 -e "INSERT INTO user (user_id, username, email) VALUES ($((100 + i)), 'user_$i', 'user$i@example.com');SELECT @@server_id as server_id, 'write' as type;" | grep server_id
done# 测试读操作负载均衡
for i in {1..20}; domysql -u root -proot -h 192.168.2.20 -P 3307 -e "SELECT @@server_id as server_id, 'read' as type FROM user LIMIT 1;" | grep server_id
done

性能压力测试

# 多线程写入测试
sysbench /usr/share/sysbench/oltp_write_only.lua \
--mysql-host=192.168.2.20 \
--mysql-port=3307 \
--mysql-user=root \
--mysql-password=root \
--mysql-db=sharding_db \
--tables=4 \
--table-size=100000 \
--threads=32 \
--time=300 \
--report-interval=10 \
run# 监控各主节点负载
watch -n 1 "mysql -u sharding -pSharding123!@# -h 192.168.2.10 -e 'SHOW PROCESSLIST' | wc -l"

数据一致性验证

# 检查所有节点数据一致性
nodes=("192.168.2.10" "192.168.2.11" "192.168.2.12" "192.168.2.13" "192.168.2.14" "192.168.2.15")for node in "${nodes[@]}"; doecho "节点 $node 数据统计:"mysql -u sharding -pSharding123!@# -h $node -e "SELECT 'user' as table_name, COUNT(*) as count FROM sharding_db.userUNION ALLSELECT 'order' as table_name, COUNT(*) as count FROM sharding_db.order;"echo "------------------------"
done# 检查GTID一致性
for node in "${nodes[@]}"; doecho "节点 $node GTID状态:"mysql -u sharding -pSharding123!@# -h $node -e "SHOW MASTER STATUS\G" | grep "Executed_Gtid_Set"
done

⑦、监控告警

# 配置关键监控指标
cat > /etc/prometheus/mysql_rules.yml << 'EOF'
groups:
- name: mysql-alertsrules:- alert: MySQLDownexpr: up{job="mysql"} == 0for: 1mlabels:severity: criticalannotations:summary: "MySQL instance down"description: "MySQL instance {{ $labels.instance }} is down"- alert: HighReplicationLagexpr: mysql_slave_status_seconds_behind_master > 30for: 5mlabels:severity: warningannotations:summary: "High replication lag"description: "Replication lag on {{ $labels.instance }} is high: {{ $value }} seconds"- alert: TooManyConnectionsexpr: mysql_global_status_threads_connected > 800for: 5mlabels:severity: warningannotations:summary: "Too many MySQL connections"description: "Too many connections on {{ $labels.instance }}: {{ $value }}"
EOF

Redis集群(多主多从选票机制)

┌─────────────────────────────────────────────────────────────────┐
│                    Redis Cluster (6节点)                        │
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │  主节点1    │    │  主节点2    │    │  主节点3    │         │
│  │ 192.168.3.10│    │ 192.168.3.11│    │ 192.168.3.12│         │
│  │  端口: 6379 │    │  端口: 6379 │    │  端口: 6379 │         │
│  └─────┬───────┘    └─────┬───────┘    └─────┬───────┘         │
│        │                  │                  │                 │
│        ▼                  ▼                  ▼                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │  从节点1    │    │  从节点2    │    │  从节点3    │         │
│  │ 192.168.3.13│    │ 192.168.3.14│    │ 192.168.3.15│         │
│  │  端口: 6379 │    │  端口: 6379 │    │  端口: 6379 │         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│                                                                 │
│                    Gossip协议通信                              │
│                   自动故障转移投票                             │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│                   客户端访问层                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  Redis集群客户端                         │   │
│  │              (自动路由和重定向)                          │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│                   监控管理层                                    │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  Prometheus │  │   Grafana   │  │ RedisInsight │            │
│  │   + exporter│  │             │  │   (可视化)   │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
└─────────────────────────────────────────────────────────────────┘
  • 自动故障转移:通过投票机制自动处理主节点故障
  • 数据分片:16384个槽位分散在多个主节点
  • 高可用性:每个主节点都有对应的从节点
  • 线性扩展:可以轻松添加新的主从节点
  • 客户端透明:客户端自动处理重定向和路由

①、服务器规划

Redis主节点:3台 (192.168.3.10, 192.168.3.11, 192.168.3.12)

Redis从节点:3台 (192.168.3.13, 192.168.3.14, 192.168.3.15)

监控节点:1台 (192.168.3.20) - 可选

②、端口规划

Redis服务端口:6379

集群总线端口:16379

监控端口:9090 (Prometheus), 3000 (Grafana), 8001 (RedisInsight)

③、系统基础配置

在所有Redis节点上执行:

# 更新系统
sudo apt update && sudo apt upgrade -y# 配置主机名和hosts
sudo hostnamectl set-hostname redis-node1  # 在节点1上,其他节点相应修改
sudo hostnamectl set-hostname redis-node2  # 节点2
sudo hostnamectl set-hostname redis-node3  # 节点3
sudo hostnamectl set-hostname redis-node4  # 节点4
sudo hostnamectl set-hostname redis-node5  # 节点5
sudo hostnamectl set-hostname redis-node6  # 节点6# 配置所有节点的/etc/hosts
sudo tee -a /etc/hosts << EOF
192.168.3.10 redis-node1
192.168.3.11 redis-node2
192.168.3.12 redis-node3
192.168.3.13 redis-node4
192.168.3.14 redis-node5
192.168.3.15 redis-node6
192.168.3.20 monitor
EOF# 配置防火墙
sudo ufw enable
sudo ufw allow 6379/tcp    # Redis服务端口
sudo ufw allow 16379/tcp   # Redis集群总线端口
sudo ufw allow 22/tcp      # SSH

④、安装Redis

在所有Redis节点上执行:

# 安装Redis
sudo apt install redis-server -y# 创建Redis数据目录
sudo mkdir -p /var/lib/redis/cluster
sudo chown redis:redis /var/lib/redis/cluster# 备份原始配置
sudo cp /etc/redis/redis.conf /etc/redis/redis.conf.backup

⑤、配置Redis集群节点

在所有Redis节点上配置:

# 配置Redis集群
sudo tee /etc/redis/redis.conf << 'EOF'
# 基础配置
port 6379
bind 0.0.0.0
daemonize yes
pidfile /var/run/redis/redis-server.pid
logfile /var/log/redis/redis-server.log
dir /var/lib/redis/cluster# 集群配置
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 15000
cluster-require-full-coverage no
cluster-replica-no-failover no# 持久化配置
appendonly yes
appendfsync everysec
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb# 内存管理
maxmemory 2gb
maxmemory-policy allkeys-lru# 安全配置
requirepass RedisCluster123!@#
masterauth RedisCluster123!@## 性能优化
tcp-backlog 511
timeout 0
tcp-keepalive 300# 慢查询日志
slowlog-log-slower-than 10000
slowlog-max-len 128# 监控配置
latency-monitor-threshold 100
EOF# 创建集群目录
sudo mkdir -p /etc/redis/cluster
sudo chown redis:redis /etc/redis/cluster# 重启Redis
sudo systemctl restart redis-server
sudo systemctl enable redis-server

⑥、创建Redis集群

在任意一个节点上执行集群创建命令:

# 使用redis-cli创建集群(3主3从)
redis-cli -a RedisCluster123!@# --cluster create \192.168.3.10:6379 \192.168.3.11:6379 \192.168.3.12:6379 \192.168.3.13:6379 \192.168.3.14:6379 \192.168.3.15:6379 \--cluster-replicas 1# 输入yes确认配置

⑦、验证集群状态

# 检查集群状态
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster nodes
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster info# 检查槽位分配
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster slots

⑧、配置监控系统

在监控节点上 (192.168.3.20):

# 安装Prometheus
wget https://github.com/prometheus/prometheus/releases/download/v2.47.0/prometheus-2.47.0.linux-amd64.tar.gz
tar -xzf prometheus-*.tar.gz
cd prometheus-*# 安装Redis exporter
wget https://github.com/oliver006/redis_exporter/releases/download/v1.55.0/redis_exporter-v1.55.0.linux-amd64.tar.gz
tar -xzf redis_exporter-*.tar.gz# 启动Redis exporter(在所有Redis节点上)
./redis_exporter -redis.addr redis://192.168.3.10:6379 -redis.password "RedisCluster123!@#" &# 配置Prometheus
sudo tee prometheus.yml << 'EOF'
global:scrape_interval: 15sscrape_configs:- job_name: 'redis-cluster'static_configs:- targets: - '192.168.3.10:9121'- '192.168.3.11:9121'- '192.168.3.12:9121'- '192.168.3.13:9121'- '192.168.3.14:9121'- '192.168.3.15:9121'metrics_path: /metrics
EOF# 启动Prometheus
./prometheus --config.file=prometheus.yml &

⑨、测试方案

集群基础功能测试

# 连接到集群并测试数据读写
redis-cli -a RedisCluster123!@# -c -h 192.168.3.10# 在Redis CLI中执行以下命令:
SET key1 "value1"
SET key2 "value2" 
SET key3 "value3"
GET key1
GET key2
GET key3# 测试跨槽位访问
MSET user:1000:name "Alice" user:1000:email "alice@example.com" user:1000:age "30"
MGET user:1000:name user:1000:email user:1000:age

自动重定向测试

# 测试key自动重定向
redis-cli -a RedisCluster123!@# -c -h 192.168.3.10 SET mykey "test value"
redis-cli -a RedisCluster123!@# -c -h 192.168.3.11 GET mykey
redis-cli -a RedisCluster123!@# -c -h 192.168.3.12 GET mykey

主节点故障转移测试

# 查看当前集群状态
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster nodes | grep master# 模拟主节点故障(停止一个主节点的Redis服务)
sudo systemctl stop redis-server  # 在某个主节点上执行# 等待30秒,让集群完成故障转移
sleep 30# 检查集群状态,确认从节点已提升为主节点
redis-cli -a RedisCluster123!@# -h 192.168.3.11 cluster nodes | grep -v fail# 测试集群是否仍然可用
redis-cli -a RedisCluster123!@# -c -h 192.168.3.11 SET after_failover "cluster is working"
redis-cli -a RedisCluster123!@# -c -h 192.168.3.11 GET after_failover# 恢复故障节点
sudo systemctl start redis-server  # 在故障节点上执行# 检查节点是否重新加入集群
sleep 10
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster nodes | grep slave

数据持久性和一致性测试

# 写入大量数据
for i in {1..1000}; doredis-cli -a RedisCluster123!@# -c -h 192.168.3.10 SET "key:$i" "value:$i" > /dev/null
done# 重启所有节点
for node in {10..15}; dossh 192.168.3.$node "sudo systemctl restart redis-server"
done# 等待集群恢复
sleep 30# 验证数据一致性
for i in {1..100}; dovalue=$(redis-cli -a RedisCluster123!@# -c -h 192.168.3.10 GET "key:$i")if [ "$value" != "value:$i" ]; thenecho "数据不一致: key:$i"fi
done

性能压力测试

# 使用redis-benchmark进行压力测试
redis-benchmark -a RedisCluster123!@# -h 192.168.3.10 -p 6379 -c 50 -n 100000 -t set,get# 集群模式压力测试
redis-benchmark -a RedisCluster123!@# -h 192.168.3.10 -p 6379 -c 100 -n 1000000 -P 10 --cluster

槽位迁移测试

# 查看当前槽位分布
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster slots# 手动迁移一个槽位(示例)
# 首先需要获取节点ID
node_id=$(redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster myid)
target_node_id=$(redis-cli -a RedisCluster123!@# -h 192.168.3.11 cluster myid)# 迁移槽位(以槽位1000为例)
redis-cli -a RedisCluster123!@# --cluster reshard 192.168.3.10:6379 \--cluster-from $node_id \--cluster-to $target_node_id \--cluster-slots 1000 \--cluster-yes

网络分区测试

# 模拟网络分区(在一台机器上阻断到另一台机器的连接)
sudo iptables -A INPUT -s 192.168.3.11 -j DROP  # 在192.168.3.10上执行# 等待集群检测到故障
sleep 60# 检查集群状态
redis-cli -a RedisCluster123!@# -h 192.168.3.12 cluster nodes | grep fail# 恢复网络
sudo iptables -D INPUT -s 192.168.3.11 -j DROP  # 在192.168.3.10上执行# 检查集群自动恢复
sleep 30
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster nodes | grep -v fail

⑩、监控验证+高可用验证

# 创建集群健康检查脚本
sudo tee /usr/local/bin/redis_cluster_check.sh << 'EOF'
#!/bin/bashecho "=== Redis集群健康检查 ==="
echo "时间: $(date)"# 检查所有节点状态
nodes=("192.168.3.10" "192.168.3.11" "192.168.3.12" "192.168.3.13" "192.168.3.14" "192.168.3.15")for node in "${nodes[@]}"; doif redis-cli -a RedisCluster123!@# -h $node ping | grep -q "PONG"; thenstatus="✓ 在线"elsestatus="✗ 离线"firole=$(redis-cli -a RedisCluster123!@# -h $node cluster nodes | grep myself | awk '{print $3}')echo "节点 $node: $status ($role)"
done# 检查集群状态
echo -e "\n集群状态:"
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster info | grep -E "(cluster_state|cluster_slots_assigned|cluster_size)"# 检查主从关系
echo -e "\n主从关系:"
redis-cli -a RedisCluster123!@# -h 192.168.3.10 cluster nodes | grep -E "(master|slave)" | head -10# 检查内存使用
echo -e "\n内存使用:"
for node in "${nodes[@]}"; domemory=$(redis-cli -a RedisCluster123!@# -h $node info memory | grep used_memory_human | cut -d: -f2)echo "节点 $node 内存: $memory"
doneecho -e "\n=== 检查完成 ==="
EOFsudo chmod +x /usr/local/bin/redis_cluster_check.sh
# 模拟多个节点同时故障
# 停止两个主节点
sudo systemctl stop redis-server  # 在192.168.3.10和192.168.3.11上执行# 检查集群是否仍然可用(应该有一个主节点存活)
sleep 45
redis-cli -a RedisCluster123!@# -c -h 192.168.3.12 SET ha_test "cluster is highly available"
redis-cli -a RedisCluster123!@# -c -h 192.168.3.12 GET ha_test# 恢复节点
sudo systemctl start redis-server  # 在故障节点上执行# 检查集群自动恢复
sleep 30
/usr/local/bin/redis_cluster_check.sh

ES集群

┌─────────────────────────────────────────────────────────────────┐
│                   Elasticsearch Cluster (7节点)                 │
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ 主节点+数据 │    │ 主节点+数据 │    │ 主节点+数据 │         │
│  │ 192.168.4.10│    │ 192.168.4.11│    │ 192.168.4.12│         │
│  │  端口: 9200 │    │  端口: 9200 │    │  端口: 9200 │         │
│  │  9300       │    │  9300       │    │  9300       │         │
│  └─────┬───────┘    └─────┬───────┘    └─────┬───────┘         │
│        │                  │                  │                 │
│        ▼                  ▼                  ▼                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │  专用主节点 │    │  数据节点   │    │  协调节点   │         │
│  │ 192.168.4.13│    │ 192.168.4.14│    │ 192.168.4.15│         │
│  │  端口: 9200 │    │  端口: 9200 │    │  端口: 9200 │         │
│  │  9300       │    │  9300       │    │  9300       │         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│                                                                 │
│                   专用主节点选举                                │
│                   数据分片和副本                               │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│                   客户端访问层                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                负载均衡器 (Nginx/Haproxy)               │   │
│  │                 192.168.4.100:9200                      │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│                   监控管理层                                    │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  Prometheus │  │   Grafana   │  │ Cerebro/Kibana│          │
│  │   + exporter│  │             │  │  (可视化)    │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
└─────────────────────────────────────────────────────────────────┘
  • 高可用性:多主节点和副本分片
  • 高性能:专用角色节点和负载均衡
  • 可扩展性:易于添加新节点
  • 监控告警:完整的监控体系
  • 数据安全:快照和恢复机制

①、服务器和端口规划

服务器规划

  • 主节点+数据节点:3台 (192.168.4.10, 192.168.4.11, 192.168.4.12)
  • 专用主节点:1台 (192.168.4.13) - 用于集群管理
  • 数据节点:1台 (192.168.4.14) - 纯数据存储
  • 协调节点:1台 (192.168.4.15) - 请求路由和聚合
  • 负载均衡器:1台 (192.168.4.100) - 可选
  • 监控节点:1台 (192.168.4.20) - 可选

端口规划

  • REST API端口:9200
  • 节点通信端口:9300
  • 监控端口:9090 (Prometheus), 3000 (Grafana), 9000 (Cerebro), 5601 (Kibana)

②、系统基础配置

在所有节点上执行

# 更新系统
sudo apt update && sudo apt upgrade -y# 配置主机名
sudo hostnamectl set-hostname es-node1  # 在节点1上,其他节点相应修改# 配置所有节点的/etc/hosts
sudo tee -a /etc/hosts << EOF
192.168.4.10 es-node1
192.168.4.11 es-node2
192.168.4.12 es-node3
192.168.4.13 es-master
192.168.4.14 es-data
192.168.4.15 es-coord
192.168.4.20 monitor
EOF# 优化系统参数
sudo tee -a /etc/sysctl.conf << EOF
vm.max_map_count=262144
vm.swappiness=1
net.core.somaxconn=1024
EOFsudo sysctl -p# 配置文件描述符和线程限制
sudo tee -a /etc/security/limits.conf << EOF
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
elasticsearch soft nproc 4096
elasticsearch hard nproc 4096
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
EOF# 配置防火墙
sudo ufw enable
sudo ufw allow 9200/tcp    # REST API
sudo ufw allow 9300/tcp    # 节点通信
sudo ufw allow 22/tcp      # SSH

③、安装Java和Elasticsearch

在所有节点上执行:

# 安装Java 11
sudo apt install openjdk-11-jdk -y# 验证Java版本
java -version# 下载Elasticsearch
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.11.0-amd64.deb
sudo dpkg -i elasticsearch-8.11.0-amd64.deb# 创建数据目录
sudo mkdir -p /data/elasticsearch
sudo chown elasticsearch:elasticsearch /data/elasticsearch# 创建日志目录
sudo mkdir -p /var/log/elasticsearch
sudo chown elasticsearch:elasticsearch /var/log/elasticsearch

④、配置Elasticsearch集群节点

主节点+数据节点配置 (192.168.4.10-12):

sudo tee /etc/elasticsearch/elasticsearch.yml << 'EOF'
# 集群名称
cluster.name: enterprise-es-cluster# 节点名称
node.name: es-node1  # 修改为对应节点名称# 节点角色
node.roles: [ master, data, ingest ]# 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300# 发现配置
discovery.seed_hosts: ["192.168.4.10", "192.168.4.11", "192.168.4.12", "192.168.4.13"]
cluster.initial_master_nodes: ["es-node1", "es-node2", "es-node3", "es-master"]# 数据路径
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch# 内存配置
bootstrap.memory_lock: true# 安全配置(生产环境建议开启)
xpack.security.enabled: false
# xpack.security.transport.ssl.enabled: true
# xpack.security.http.ssl.enabled: true# 监控配置
xpack.monitoring.collection.enabled: true# 网关配置
gateway.recover_after_nodes: 3
gateway.expected_nodes: 7
gateway.recover_after_time: 5m# 线程池配置
thread_pool.write.queue_size: 1000
thread_pool.search.queue_size: 1000# 索引配置
action.destructive_requires_name: true
EOF

专用主节点配置 (192.168.4.13):

sudo tee /etc/elasticsearch/elasticsearch.yml << 'EOF'
cluster.name: enterprise-es-cluster
node.name: es-master
node.roles: [ master ]network.host: 0.0.0.0
http.port: 9200
transport.port: 9300discovery.seed_hosts: ["192.168.4.10", "192.168.4.11", "192.168.4.12", "192.168.4.13"]
cluster.initial_master_nodes: ["es-node1", "es-node2", "es-node3", "es-master"]path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true# 专用主节点配置
cluster.fault_detection.leader_check.interval: 2s
cluster.fault_detection.follower_check.interval: 2s
EOF

数据节点配置 (192.168.4.14):

sudo tee /etc/elasticsearch/elasticsearch.yml << 'EOF'
cluster.name: enterprise-es-cluster
node.name: es-data
node.roles: [ data ]network.host: 0.0.0.0
http.port: 9200
transport.port: 9300discovery.seed_hosts: ["192.168.4.10", "192.168.4.11", "192.168.4.12", "192.168.4.13"]
cluster.initial_master_nodes: ["es-node1", "es-node2", "es-node3", "es-master"]path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true# 数据节点优化
indices.fielddata.cache.size: 40%
indices.queries.cache.size: 10%
EOF

协调节点配置 (192.168.4.15):

sudo tee /etc/elasticsearch/elasticsearch.yml << 'EOF'
cluster.name: enterprise-es-cluster
node.name: es-coord
node.roles: [ ingest ]network.host: 0.0.0.0
http.port: 9200
transport.port: 9300discovery.seed_hosts: ["192.168.4.10", "192.168.4.11", "192.168.4.12", "192.168.4.13"]
cluster.initial_master_nodes: ["es-node1", "es-node2", "es-node3", "es-master"]path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true# 协调节点配置
http.max_content_length: 100mb
EOF

⑤、配置JVM参数

sudo tee /etc/elasticsearch/jvm.options.d/enterprise.options << 'EOF'
# JVM堆大小(建议不超过物理内存的50%)
-Xms8g
-Xmx8g# GC配置
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:InitiatingHeapOccupancyPercent=35# 日志配置
-Xlog:gc*,gc+age=trace,safepoint:file=/var/log/elasticsearch/gc.log:utctime,pid,tags:filecount=32,filesize=64m# 内存配置
-XX:MaxDirectMemorySize=4294967296
-Des.cgroups.hierarchy.override=/# 性能优化
-XX:+AlwaysPreTouch
-XX:+UseStringDeduplication
-XX:+UseCompressedOops
EOF

⑥、启动Elasticsearch集群

在所有节点上执行:

# 配置systemd服务
sudo systemctl daemon-reload
sudo systemctl enable elasticsearch
sudo systemctl start elasticsearch# 检查服务状态
sudo systemctl status elasticsearch
sudo journalctl -u elasticsearch -f# 检查集群健康状态
curl -X GET "http://localhost:9200/_cluster/health?pretty"

⑦、配置负载均衡器(可选)

在负载均衡器节点上 (192.168.4.100):

# 安装Nginx
sudo apt install nginx -y# 配置负载均衡
sudo tee /etc/nginx/conf.d/elasticsearch.conf << 'EOF'
upstream elasticsearch {server 192.168.4.10:9200 weight=3;server 192.168.4.11:9200 weight=3;server 192.168.4.12:9200 weight=3;server 192.168.4.15:9200 weight=1;  # 协调节点keepalive 32;
}server {listen 9200;location / {proxy_pass http://elasticsearch;proxy_http_version 1.1;proxy_set_header Connection "";proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_connect_timeout 5s;proxy_send_timeout 10s;proxy_read_timeout 30s;}access_log /var/log/nginx/es_access.log;error_log /var/log/nginx/es_error.log;
}
EOFsudo systemctl restart nginx

⑧、测试方案

集群健康状态检查

# 检查集群健康状态
curl -X GET "http://192.168.4.10:9200/_cluster/health?pretty"# 检查节点状态
curl -X GET "http://192.168.4.10:9200/_cat/nodes?v"# 检查分片分配
curl -X GET "http://192.168.4.10:9200/_cat/shards?v"# 检查主节点选举
curl -X GET "http://192.168.4.10:9200/_cat/master?v"

数据读写测试

# 创建测试索引
curl -X PUT "http://192.168.4.10:9200/test-index" -H 'Content-Type: application/json' -d'
{"settings": {"number_of_shards": 3,"number_of_replicas": 2,"refresh_interval": "1s"},"mappings": {"properties": {"timestamp": {"type": "date"},"message": {"type": "text","fields": {"keyword": {"type": "keyword"}}},"level": {"type": "keyword"},"host": {"type": "ip"}}}
}'# 批量插入测试数据
for i in {1..1000}; docurl -X POST "http://192.168.4.10:9200/test-index/_doc" -H 'Content-Type: application/json' -d'{"timestamp": "'$(date -u +"%Y-%m-%dT%H:%M:%SZ")'","message": "Test log message number '$i'","level": "INFO","host": "192.168.1.'$((i % 10))'"}'
done# 刷新索引并验证数据
curl -X POST "http://192.168.4.10:9200/test-index/_refresh"
curl -X GET "http://192.168.4.10:9200/test-index/_count"# 查询测试
curl -X GET "http://192.168.4.10:9200/test-index/_search" -H 'Content-Type: application/json' -d'
{"query": {"match": {"message": "test"}},"aggs": {"levels": {"terms": {"field": "level"}}}
}'

节点故障转移测试

# 查看当前主节点
MASTER_NODE=$(curl -s "http://192.168.4.10:9200/_cat/master" | awk '{print $4}')
echo "当前主节点: $MASTER_NODE"# 停止主节点
ssh $MASTER_NODE "sudo systemctl stop elasticsearch"# 等待故障转移(30-60秒)
sleep 45# 检查新的主节点
NEW_MASTER=$(curl -s "http://192.168.4.11:9200/_cat/master" | awk '{print $4}')
echo "新的主节点: $NEW_MASTER"# 验证集群仍然可用
curl -X GET "http://$NEW_MASTER:9200/_cluster/health?pretty"
curl -X GET "http://$NEW_MASTER:9200/test-index/_count"# 恢复故障节点
ssh $MASTER_NODE "sudo systemctl start elasticsearch"# 等待节点重新加入集群
sleep 30
curl -X GET "http://$NEW_MASTER:9200/_cat/nodes?v"

数据一致性测试

# 插入特定测试数据
curl -X POST "http://192.168.4.10:9200/consistency-test/_doc/1" -H 'Content-Type: application/json' -d'
{"id": 1,"data": "consistency check data","timestamp": "'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'"
}'# 从不同节点读取数据验证一致性
for node in 192.168.4.{10,11,12,14,15}; doecho "从节点 $node 读取数据:"curl -s "http://$node:9200/consistency-test/_doc/1" | grep -E '"found"|"data"'echo "---"
done

性能压力测试

# 使用elasticsearch-stress-test工具
python3 -m pip install elasticsearch-stress-test# 创建压力测试配置
cat > stress_test.yml << 'EOF'
cluster:nodes: - http://192.168.4.10:9200- http://192.168.4.11:9200- http://192.168.4.12:9200operations:- name: indexoperation-type: indexindex: stress-testclients: 10requests: 10000body: |{"timestamp": "{{ now }}","message": "Stress test message {{ increment }}","level": "{{ random['INFO','WARN','ERROR'] }}","value": {{ random(1, 1000) }}}- name: searchoperation-type: searchindex: stress-testclients: 5requests: 5000body: |{"query": {"match_all": {}},"size": 10}
EOF# 运行压力测试
estress stress_test.yml# 使用ab进行HTTP压力测试
ab -n 10000 -c 100 -T 'application/json' -p test_data.json http://192.168.4.10:9200/test-index/_search

备份恢复测试

# 创建快照仓库
curl -X PUT "http://192.168.4.10:9200/_snapshot/backup_repository" -H 'Content-Type: application/json' -d'
{"type": "fs","settings": {"location": "/data/backups","compress": true}
}'# 创建快照
curl -X PUT "http://192.168.4.10:9200/_snapshot/backup_repository/snapshot_1?wait_for_completion=true"# 删除测试索引
curl -X DELETE "http://192.168.4.10:9200/test-index"# 从快照恢复
curl -X POST "http://192.168.4.10:9200/_snapshot/backup_repository/snapshot_1/_restore?wait_for_completion=true"# 验证恢复结果
curl -X GET "http://192.168.4.10:9200/test-index/_count"

⑨、监控配置

安装和配置监控

# 安装Prometheus exporter
wget https://github.com/prometheus-community/elasticsearch_exporter/releases/download/v1.6.0/elasticsearch_exporter-1.6.0.linux-amd64.tar.gz
tar -xzf elasticsearch_exporter-*.tar.gz# 启动exporter
./elasticsearch_exporter --es.uri=http://192.168.4.10:9200 --web.listen-address:9114 &# 配置Grafana仪表板
# 导入Elasticsearch监控仪表板(ID: 2322)

集群健康检查脚本

sudo tee /usr/local/bin/es_cluster_check.sh << 'EOF'
#!/bin/bashecho "=== Elasticsearch集群健康检查 ==="
echo "时间: $(date)"# 检查节点状态
nodes=("192.168.4.10" "192.168.4.11" "192.168.4.12" "192.168.4.13" "192.168.4.14" "192.168.4.15")for node in "${nodes[@]}"; doif curl -s "http://$node:9200" > /dev/null; thenstatus="✓ 在线"health=$(curl -s "http://$node:9200/_cluster/health" | grep -o '"status":"[^"]*"' | cut -d'"' -f4)node_count=$(curl -s "http://$node:9200/_cat/nodes" | wc -l)elsestatus="✗ 离线"health="未知"node_count="未知"fiecho "节点 $node: $status | 健康状态: $health | 节点数: $node_count"
done# 检查分片状态
echo -e "\n分片状态:"
curl -s "http://192.168.4.10:9200/_cat/health?v"echo -e "\n索引状态:"
curl -s "http://192.168.4.10:9200/_cat/indices?v" | head -10echo -e "\n内存使用:"
for node in "${nodes[@]}"; doif curl -s "http://$node:9200" > /dev/null; thenmemory=$(curl -s "http://$node:9200/_cat/nodes?v&h=name,heap.percent,ram.percent" | grep $node)echo "节点 $node 内存: $memory"fi
doneecho -e "\n=== 检查完成 ==="
EOFsudo chmod +x /usr/local/bin/es_cluster_check.sh

MongoDB集群

┌─────────────────────────────────────────────────────────────────┐
│                    MongoDB分片集群                              │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   Mongos路由节点 (3节点)                 │   │
│  │                 192.168.5.10:27017                      │   │
│  │                 192.168.5.11:27017                      │   │
│  │                 192.168.5.12:27017                      │   │
│  └─────────────────────────┬───────────────────────────────┘   │
│                            │                                   │
└────────────────────────────┼───────────────────────────────────┘│
┌────────────────────────────┼───────────────────────────────────┐
│                   配置服务器副本集 (3节点)                     │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ 配置服务器1 │    │ 配置服务器2 │    │ 配置服务器3 │         │
│  │ 192.168.5.20│    │ 192.168.5.21│    │ 192.168.5.22│         │
│  │  端口: 27019│    │  端口: 27019│    │  端口: 27019│         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘│
┌────────────────────────────┼───────────────────────────────────┐
│                   分片服务器副本集 (3个分片,每个3节点)         │
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ 分片1主节点 │    │ 分片2主节点 │    │ 分片3主节点 │         │
│  │ 192.168.5.30│    │ 192.168.5.40│    │ 192.168.5.50│         │
│  │  端口: 27018│    │  端口: 27018│    │  端口: 27018│         │
│  └─────┬───────┘    └─────┬───────┘    └─────┬───────┘         │
│        │                  │                  │                 │
│        ▼                  ▼                  ▼                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ 分片1从节点 │    │ 分片2从节点 │    │ 分片3从节点 │         │
│  │ 192.168.5.31│    │ 192.168.5.41│    │ 192.168.5.51│         │
│  │  端口: 27018│    │  端口: 27018│    │  端口: 27018│         │
│  └─────┬───────┘    └─────┬───────┘    └─────┬───────┘         │
│        │                  │                  │                 │
│        ▼                  ▼                  ▼                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ 分片1仲裁节点│    │ 分片2仲裁节点│    │ 分片3仲裁节点│       │
│  │ 192.168.5.32│    │ 192.168.5.42│    │ 192.168.5.52│         │
│  │  端口: 27018│    │  端口: 27018│    │  端口: 27018│         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│                   监控管理层                                    │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  Prometheus │  │   Grafana   │  │ MongoDB Compass│          │
│  │   + exporter│  │             │  │  (可视化)    │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
└─────────────────────────────────────────────────────────────────┘
  • 水平扩展:通过分片实现数据分布式存储
  • 高可用性:副本集确保数据冗余和自动故障转移
  • 负载均衡:多个Mongos节点实现请求分发
  • 数据安全:认证和加密机制
  • 监控告警:完整的监控体系
  • 备份恢复:可靠的数据保护机制

①、环境准备

服务器规划

  • Mongos路由节点: 3台 (192.168.5.10, 192.168.5.11, 192.168.5.12)
  • 配置服务器副本集: 3台 (192.168.5.20, 192.168.5.21, 192.168.5.22)
  • 分片1副本集: 3台 (192.168.5.30, 192.168.5.31, 192.168.5.32)
  • 分片2副本集: 3台 (192.168.5.40, 192.168.5.41, 192.168.5.42)
  • 分片3副本集: 3台 (192.168.5.50, 192.168.5.51, 192.168.5.52)
  • 监控节点: 1台 (192.168.5.100) - 可选

端口规划

  • Mongos路由端口: 27017
  • 分片节点端口: 27018
  • 配置服务器端口: 27019
  • 监控端口: 9216 (MongoDB exporter), 9090 (Prometheus), 3000 (Grafana)

②、系统基础配置

在所有节点上执行:

# 更新系统
sudo apt update && sudo apt upgrade -y# 配置主机名
sudo hostnamectl set-hostname mongos1  # 在mongos节点1上
sudo hostnamectl set-hostname cfg1    # 在配置服务器1上
sudo hostnamectl set-hostname shard1a # 在分片1主节点上# 配置所有节点的/etc/hosts
sudo tee -a /etc/hosts << EOF
# Mongos节点
192.168.5.10 mongos1
192.168.5.11 mongos2
192.168.5.12 mongos3# 配置服务器
192.168.5.20 cfg1
192.168.5.21 cfg2
192.168.5.22 cfg3# 分片1
192.168.5.30 shard1a
192.168.5.31 shard1b
192.168.5.32 shard1c# 分片2
192.168.5.40 shard2a
192.168.5.41 shard2b
192.168.5.42 shard2c# 分片3
192.168.5.50 shard3a
192.168.5.51 shard3b
192.168.5.52 shard3c192.168.5.100 monitor
EOF# 优化系统参数
sudo tee -a /etc/sysctl.conf << EOF
# 网络优化
net.core.somaxconn=65535
net.ipv4.tcp_max_syn_backlog=65535
net.ipv4.tcp_keepalive_time=300# 内存优化
vm.swappiness=1
vm.dirty_ratio=15
vm.dirty_background_ratio=5# 文件系统优化
fs.file-max=2097152
fs.aio-max-nr=1048576
EOFsudo sysctl -p# 配置文件描述符限制
sudo tee -a /etc/security/limits.conf << EOF
* soft nofile 64000
* hard nofile 64000
* soft nproc 64000
* hard nproc 64000
EOF# 配置防火墙
sudo ufw enable
sudo ufw allow 27017/tcp  # Mongos
sudo ufw allow 27018/tcp  # 分片节点
sudo ufw allow 27019/tcp  # 配置服务器
sudo ufw allow 22/tcp     # SSH

③、安装MongoDB

在所有节点上执行:

# 导入MongoDB GPG密钥
wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | sudo apt-key add -# 添加MongoDB仓库
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/6.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-6.0.list# 安装MongoDB
sudo apt update
sudo apt install -y mongodb-org# 创建数据目录
sudo mkdir -p /data/db
sudo mkdir -p /data/log
sudo chown -R mongodb:mongodb /data# 禁用透明大页面(THP)
sudo tee /etc/systemd/system/disable-thp.service << 'EOF'
[Unit]
Description=Disable Transparent Huge Pages (THP)
DefaultDependencies=no
After=sysinit.target local-fs.target
Before=mongod.service[Service]
Type=oneshot
ExecStart=/bin/sh -c 'echo never | tee /sys/kernel/mm/transparent_hugepage/enabled > /dev/null'
ExecStart=/bin/sh -c 'echo never | tee /sys/kernel/mm/transparent_hugepage/defrag > /dev/null'[Install]
WantedBy=basic.target
EOFsudo systemctl enable disable-thp
sudo systemctl start disable-thp

④、配置副本集节点

配置服务器副本集 (192.168.5.20-22):

sudo tee /etc/mongod.conf << 'EOF'
# 存储配置
storage:dbPath: /data/dbjournal:enabled: truewiredTiger:engineConfig:cacheSizeGB: 1journalCompressor: snappycollectionConfig:blockCompressor: snappyindexConfig:prefixCompression: true# 网络配置
net:port: 27019bindIp: 0.0.0.0maxIncomingConnections: 10000# 进程管理
processManagement:fork: truepidFilePath: /var/run/mongodb/mongod.pid# 系统日志
systemLog:destination: filelogAppend: truepath: /data/log/mongod.loglogRotate: reopen# 副本集配置
replication:replSetName: configReplSetoplogSizeMB: 1024# 配置服务器模式
sharding:clusterRole: configsvr# 安全配置
security:keyFile: /etc/mongodb-keyfileauthorization: enabled# 操作日志
operationProfiling:mode: slowOpslowOpThresholdMs: 100slowOpSampleRate: 0.5
EOF# 创建密钥文件(在所有配置服务器上使用相同的密钥文件)
sudo openssl rand -base64 756 > /etc/mongodb-keyfile
sudo chown mongodb:mongodb /etc/mongodb-keyfile
sudo chmod 400 /etc/mongodb-keyfile

分片服务器配置 (以分片1为例,192.168.5.30-32):

sudo tee /etc/mongod.conf << 'EOF'
# 存储配置
storage:dbPath: /data/dbjournal:enabled: truewiredTiger:engineConfig:cacheSizeGB: 4  # 根据内存调整,建议50-60%系统内存journalCompressor: snappycollectionConfig:blockCompressor: snappyindexConfig:prefixCompression: true# 网络配置
net:port: 27018bindIp: 0.0.0.0maxIncomingConnections: 20000# 进程管理
processManagement:fork: truepidFilePath: /var/run/mongodb/mongod.pid# 系统日志
systemLog:destination: filelogAppend: truepath: /data/log/mongod.loglogRotate: reopen# 副本集配置
replication:replSetName: shard1oplogSizeMB: 2048  # 更大的oplog用于分片# 分片配置
sharding:clusterRole: shardsvr# 安全配置
security:keyFile: /etc/mongodb-keyfileauthorization: enabled# 性能优化
setParameter:enableLocalhostAuthBypass: falsettlMonitorEnabled: truelogLevel: 1
EOF# 复制密钥文件
sudo scp /etc/mongodb-keyfile root@192.168.5.30:/etc/
sudo scp /etc/mongodb-keyfile root@192.168.5.31:/etc/
sudo scp /etc/mongodb-keyfile root@192.168.5.32:/etc/

Mongos路由节点配置 (192.168.5.10-12):

sudo tee /etc/mongos.conf << 'EOF'
# 网络配置
net:port: 27017bindIp: 0.0.0.0maxIncomingConnections: 30000# 进程管理
processManagement:fork: truepidFilePath: /var/run/mongodb/mongos.pid# 系统日志
systemLog:destination: filelogAppend: truepath: /data/log/mongos.loglogRotate: reopen# 分片配置
sharding:configDB: configReplSet/192.168.5.20:27019,192.168.5.21:27019,192.168.5.22:27019# 安全配置
security:keyFile: /etc/mongodb-keyfile# 性能优化
setParameter:transactionLifetimeLimitSeconds: 60maxSessions: 1000000
EOF# 复制密钥文件
sudo scp /etc/mongodb-keyfile root@192.168.5.10:/etc/
sudo scp /etc/mongodb-keyfile root@192.168.5.11:/etc/
sudo scp /etc/mongodb-keyfile root@192.168.5.12:/etc/

⑤、启动和初始化集群

启动配置服务器副本集:

# 在所有配置服务器上启动
sudo systemctl start mongod
sudo systemctl enable mongod# 在cfg1上初始化副本集
mongosh --host 192.168.5.20 --port 27019 --eval "
rs.initiate({_id: 'configReplSet',configsvr: true,members: [{ _id: 0, host: '192.168.5.20:27019' },{ _id: 1, host: '192.168.5.21:27019' },{ _id: 2, host: '192.168.5.22:27019' }]
})
"

启动分片副本集(以分片1为例):

# 在分片1的所有节点上启动
sudo systemctl start mongod
sudo systemctl enable mongod# 在shard1a上初始化副本集
mongosh --host 192.168.5.30 --port 27018 --eval "
rs.initiate({_id: 'shard1',members: [{ _id: 0, host: '192.168.5.30:27018', priority: 3 },{ _id: 1, host: '192.168.5.31:27018', priority: 2 },{ _id: 2, host: '192.168.5.32:27018', priority: 1, arbiterOnly: true }]
})
"# 对其他分片重复相同操作

启动Mongos路由节点:

# 在所有Mongos节点上启动
sudo systemctl start mongos
sudo systemctl enable mongos

配置分片集群:

# 连接到任意Mongos节点
mongosh --host 192.168.5.10 --port 27017# 添加分片到集群
sh.addShard("shard1/192.168.5.30:27018,192.168.5.31:27018,192.168.5.32:27018")
sh.addShard("shard2/192.168.5.40:27018,192.168.5.41:27018,192.168.5.42:27018")
sh.addShard("shard3/192.168.5.50:27018,192.168.5.51:27018,192.168.5.52:27018")# 启用分片数据库
sh.enableSharding("testdb")# 创建用户(在admin数据库)
use admin
db.createUser({user: "clusterAdmin",pwd: "ClusterAdmin123!@#",roles: [ { role: "clusterAdmin", db: "admin" } ]
})db.createUser({user: "appUser",pwd: "AppUser123!@#",roles: [ { role: "readWrite", db: "testdb" } ]
})

⑥、测试方案

集群状态检查

# 检查配置服务器状态
mongosh --host 192.168.5.20 --port 27019 --eval "rs.status()"# 检查分片状态
mongosh --host 192.168.5.30 --port 27018 --eval "rs.status()"# 检查集群状态
mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "
sh.status()
db.adminCommand({ listShards: 1 })
db.adminCommand({ listDatabases: 1 })
"

分片功能测试

# 创建分片集合和分片键
mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "
use testdb// 创建分片集合
db.createCollection('users')
db.createCollection('orders')// 创建分片键
sh.shardCollection('testdb.users', { userId: 1 })
sh.shardCollection('testdb.orders', { orderId: 'hashed' })// 查看分片信息
db.users.getShardDistribution()
db.orders.getShardDistribution()
"# 插入测试数据
mongosh --host 192.168.5.10 --port 27017 --username appUser --password AppUser123!@# --eval "
use testdb// 批量插入用户数据
for (let i = 1; i <= 10000; i++) {db.users.insert({userId: i,username: 'user' + i,email: 'user' + i + '@example.com',createdAt: new Date(),data: 'x'.repeat(1000) // 添加一些数据量})if (i % 1000 === 0) {print('Inserted ' + i + ' users')}
}// 批量插入订单数据
for (let i = 1; i <= 50000; i++) {db.orders.insert({orderId: i,userId: Math.floor(Math.random() * 10000) + 1,amount: Math.random() * 1000,status: ['pending', 'completed', 'cancelled'][Math.floor(Math.random() * 3)],createdAt: new Date()})if (i % 5000 === 0) {print('Inserted ' + i + ' orders')}
}
"

数据分布验证

# 检查数据分布
mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "
use testdb// 检查数据分布
print('Users collection distribution:')
db.users.aggregate([{ $group: { _id: null, count: { $sum: 1 } } }
])print('Orders collection distribution:')
db.orders.aggregate([{ $group: { _id: null, count: { $sum: 1 } } }
])// 检查分片数据平衡
db.adminCommand({ balancerStatus: 1 })// 检查块分布
db.adminCommand({ listShards: 1 }).shards.forEach(function(shard) {print('Shard: ' + shard._id)printjson(db.adminCommand({ dataSize: 'testdb.users', keyPattern: { userId: 1 }, min: { userId: 0 }, max: { userId: 10000 } }))
})
"

故障转移测试

# 测试分片主节点故障转移
echo "当前分片1主节点:"
mongosh --host 192.168.5.30 --port 27018 --eval "rs.isMaster().primary"# 停止分片1主节点
ssh shard1a "sudo systemctl stop mongod"# 等待故障转移(30-60秒)
sleep 45echo "新的分片1主节点:"
mongosh --host 192.168.5.31 --port 27018 --eval "rs.isMaster().primary"# 测试集群是否仍然可用
mongosh --host 192.168.5.10 --port 27017 --username appUser --password AppUser123!@# --eval "
use testdb
db.users.count()
db.orders.count()
"# 恢复故障节点
ssh shard1a "sudo systemctl start mongod"# 等待节点重新加入集群
sleep 30
mongosh --host 192.168.5.31 --port 27018 --eval "rs.status()"

性能压力测试

# 使用mongo-perf进行压力测试
python3 -m pip install pymongo# 创建压力测试脚本
cat > stress_test.py << 'EOF'
from pymongo import MongoClient
from concurrent.futures import ThreadPoolExecutor
import time
import randomclass MongoDBStressTest:def __init__(self, uri):self.client = MongoClient(uri)self.db = self.client.testdbdef write_test(self, num_operations):start_time = time.time()with ThreadPoolExecutor(max_workers=50) as executor:for i in range(num_operations):executor.submit(self._write_operation, i)return time.time() - start_timedef _write_operation(self, i):self.db.stress_test.insert_one({'data': 'x' * 1000,'timestamp': time.time(),'counter': i})def read_test(self, num_operations):start_time = time.time()with ThreadPoolExecutor(max_workers=50) as executor:for i in range(num_operations):executor.submit(self._read_operation)return time.time() - start_timedef _read_operation(self):self.db.stress_test.find().limit(100)if __name__ == "__main__":# 连接到Mongostest = MongoDBStressTest("mongodb://appUser:AppUser123!@#192.168.5.10:27017,192.168.5.11:27017,192.168.5.12:27017/testdb?replicaSet=configReplSet")# 写入测试write_time = test.write_test(10000)print(f"Write 10000 documents: {write_time:.2f} seconds")print(f"Write throughput: {10000/write_time:.2f} ops/sec")# 读取测试read_time = test.read_test(10000)print(f"Read 10000 operations: {read_time:.2f} seconds")print(f"Read throughput: {10000/read_time:.2f} ops/sec")
EOFpython3 stress_test.py

备份恢复测试

# 创建备份
mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "
use admin
db.adminCommand({createBackup: 1,backupDbPath: '/data/backup',backupName: 'full_backup'
})
"# 模拟数据丢失
mongosh --host 192.168.5.10 --port 27017 --username appUser --password AppUser123!@# --eval "
use testdb
db.critical_data.drop()
"# 恢复备份
mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "
use admin
db.adminCommand({restoreBackup: 1,backupDbPath: '/data/backup',backupName: 'full_backup'
})
"

⑦、监控配置

安装MongoDB exporter

# 在监控节点上安装
wget https://github.com/percona/mongodb_exporter/releases/download/v0.39.0/mongodb_exporter-0.39.0.linux-amd64.tar.gz
tar -xzf mongodb_exporter-*.tar.gz# 启动exporter
./mongodb_exporter --mongodb.uri="mongodb://clusterAdmin:ClusterAdmin123!@#192.168.5.10:27017,192.168.5.11:27017,192.168.5.12:27017/admin?replicaSet=configReplSet" --web.listen-address=":9216" &

集群健康检查脚本

sudo tee /usr/local/bin/mongodb_cluster_check.sh << 'EOF'
#!/bin/bashecho "=== MongoDB集群健康检查 ==="
echo "时间: $(date)"# 检查Mongos节点
mongos_nodes=("192.168.5.10" "192.168.5.11" "192.168.5.12")
for node in "${mongos_nodes[@]}"; doif mongosh --host $node --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "db.adminCommand({ ping: 1 })" --quiet; thenstatus="✓ 在线"elsestatus="✗ 离线"fiecho "Mongos $node: $status"
done# 检查配置服务器
config_nodes=("192.168.5.20" "192.168.5.21" "192.168.5.22")
for node in "${config_nodes[@]}"; doif mongosh --host $node --port 27019 --username clusterAdmin --password ClusterAdmin123!@# --eval "rs.status().ok" --quiet; thenstatus="✓ 在线"state=$(mongosh --host $node --port 27019 --username clusterAdmin --password ClusterAdmin123!@# --eval "rs.status().myState" --quiet)elsestatus="✗ 离线"state="未知"fiecho "配置服务器 $node: $status (状态: $state)"
done# 检查分片状态
shards=("shard1" "shard2" "shard3")
for shard in "${shards[@]}"; doprimary=$(mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "db.adminCommand({ listShards: 1 })" --quiet | grep -A5 "$shard" | grep primary | cut -d'"' -f4)if [ -n "$primary" ]; thenstatus=$(mongosh --host $primary --port 27018 --username clusterAdmin --password ClusterAdmin123!@# --eval "rs.status().ok" --quiet)echo "分片 $shard: ✓ 在线 (主节点: $primary)"elseecho "分片 $shard: ✗ 离线"fi
done# 检查平衡器状态
balancer_status=$(mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "sh.getBalancerState()" --quiet)
echo "平衡器状态: $balancer_status"# 检查集群统计信息
echo -e "\n集群统计:"
mongosh --host 192.168.5.10 --port 27017 --username clusterAdmin --password ClusterAdmin123!@# --eval "
print('数据库数量: ' + db.adminCommand({ listDatabases: 1 }).databases.length)
print('分片数量: ' + db.adminCommand({ listShards: 1 }).shards.length)
print('总数据大小: ' + (db.stats().dataSize / 1024 / 1024).toFixed(2) + ' MB')
" --quietecho -e "\n=== 检查完成 ==="
EOFsudo chmod +x /usr/local/bin/mongodb_cluster_check.sh

RocketMQ集群

┌─────────────────────────────────────────────────────────────────┐
│                   RocketMQ集群                                  │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   NameServer集群 (3节点)                 │   │
│  │                 192.168.6.10:9876                       │   │
│  │                 192.168.6.11:9876                       │   │
│  │                 192.168.6.12:9876                       │   │
│  └─────────────────────────┬───────────────────────────────┘   │
│                            │                                   │
└────────────────────────────┼───────────────────────────────────┘│
┌────────────────────────────┼───────────────────────────────────┐
│                   Broker集群 (2主2从)                          │
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ Broker主节点│    │ Broker主节点│    │ Broker从节点│         │
│  │ 192.168.6.20│    │ 192.168.6.21│    │ 192.168.6.22│         │
│  │  端口: 10909│    │  端口: 10909│    │  端口: 10909│         │
│  │      10911  │    │      10911  │    │      10911  │         │
│  └─────┬───────┘    └─────┬───────┘    └─────┬───────┘         │
│        │                  │                  │                 │
│        ▼                  ▼                  ▼                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐         │
│  │ Broker从节点│    │ 控制台节点  │    │  监控节点   │         │
│  │ 192.168.6.23│    │ 192.168.6.30│    │ 192.168.6.40│         │
│  │  端口: 10909│    │  端口: 8080 │    │  端口: 9090 │         │
│  │      10911  │    │             │    │      3000   │         │
│  └─────────────┘    └─────────────┘    └─────────────┘         │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│                   客户端访问层                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  Producer/Consumer                      │   │
│  │              (自动发现和负载均衡)                        │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────────┐
│                   监控管理层                                    │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  Prometheus │  │   Grafana   │  │ RocketMQ控制台│          │
│  │   + exporter│  │             │  │             │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
└─────────────────────────────────────────────────────────────────┘

高可用性:多主多从架构,自动故障转移

高性能:优化的JVM和系统参数配置

可扩展性:易于添加新的Broker节点

监控告警:完整的监控体系

数据安全:同步复制确保数据不丢失

管理便捷:Web控制台可视化管理

①、环境准备

服务器规划

  • NameServer节点: 3台 (192.168.6.10, 192.168.6.11, 192.168.6.12)
  • Broker主节点: 2台 (192.168.6.20, 192.168.6.21)
  • Broker从节点: 2台 (192.168.6.22, 192.168.6.23)
  • 控制台节点: 1台 (192.168.6.30)
  • 监控节点: 1台 (192.168.6.40)

端口规划

  • NameServer端口: 9876
  • Broker端口: 10909 (VIP), 10911 (主端口), 10912 (HA端口)
  • 控制台端口: 8080
  • 监控端口: 9090 (Prometheus), 3000 (Grafana)

②、系统基础配置

在所有节点上执行:

# 更新系统
sudo apt update && sudo apt upgrade -y# 配置主机名
sudo hostnamectl set-hostname nameserver1  # 在NameServer1上
sudo hostnamectl set-hostname broker-master1  # 在Broker主节点1上
sudo hostnamectl set-hostname broker-slave1   # 在Broker从节点1上# 配置所有节点的/etc/hosts
sudo tee -a /etc/hosts << EOF
# NameServer节点
192.168.6.10 nameserver1
192.168.6.11 nameserver2
192.168.6.12 nameserver3# Broker节点
192.168.6.20 broker-master1
192.168.6.21 broker-master2
192.168.6.22 broker-slave1
192.168.6.23 broker-slave2# 其他节点
192.168.6.30 console
192.168.6.40 monitor
EOF# 优化系统参数
sudo tee -a /etc/sysctl.conf << EOF
# 网络优化
net.core.somaxconn=65535
net.ipv4.tcp_max_syn_backlog=65535
net.ipv4.tcp_keepalive_time=300
net.ipv4.tcp_keepalive_intvl=30
net.ipv4.tcp_keepalive_probes=3# 内存优化
vm.swappiness=10
vm.dirty_ratio=20
vm.dirty_background_ratio=10
vm.overcommit_memory=1# 文件系统优化
fs.file-max=1000000
fs.aio-max-nr=1048576
EOFsudo sysctl -p# 配置文件描述符限制
sudo tee -a /etc/security/limits.conf << EOF
* soft nofile 1000000
* hard nofile 1000000
* soft nproc 1000000
* hard nproc 1000000
* soft memlock unlimited
* hard memlock unlimited
EOF# 配置防火墙
sudo ufw enable
sudo ufw allow 9876/tcp    # NameServer
sudo ufw allow 10909/tcp   # Broker VIP
sudo ufw allow 10911/tcp   # Broker主端口
sudo ufw allow 10912/tcp   # Broker HA端口
sudo ufw allow 8080/tcp    # 控制台
sudo ufw allow 22/tcp      # SSH

③、安装Java环境

在所有节点上执行:

# 安装Java 11
sudo apt install openjdk-11-jdk -y# 验证Java版本
java -version# 设置JAVA_HOME
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' >> ~/.bashrc
echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc
source ~/.bashrc

④、安装和配置RocketMQ

在所有节点上下载RocketMQ:

# 下载RocketMQ
wget https://archive.apache.org/dist/rocketmq/4.9.7/rocketmq-all-4.9.7-bin-release.zip
unzip rocketmq-all-4.9.7-bin-release.zip
sudo mv rocketmq-all-4.9.7-bin-release /opt/rocketmq
sudo chown -R $USER:$USER /opt/rocketmq# 创建数据和日志目录
sudo mkdir -p /data/rocketmq/{logs,store,commitlog,consumequeue,index}
sudo chown -R $USER:$USER /data/rocketmq

配置NameServer节点 (192.168.6.10-12):

# 创建NameServer启动脚本
sudo tee /opt/rocketmq/bin/start-nameserver.sh << 'EOF'
#!/bin/bash
export ROCKETMQ_HOME=/opt/rocketmq
export JAVA_OPTS="-Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -verbose:gc -Xloggc:/data/rocketmq/logs/rocketmq_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"nohup sh ${ROCKETMQ_HOME}/bin/mqnamesrv > /data/rocketmq/logs/namesrv.log 2>&1 &
echo "NameServer started!"
EOFsudo chmod +x /opt/rocketmq/bin/start-nameserver.sh# 创建停止脚本
sudo tee /opt/rocketmq/bin/stop-nameserver.sh << 'EOF'
#!/bin/bash
export ROCKETMQ_HOME=/opt/rocketmq
sh ${ROCKETMQ_HOME}/bin/mqshutdown namesrv
EOFsudo chmod +x /opt/rocketmq/bin/stop-nameserver.sh

配置Broker主节点1 (192.168.6.20):

sudo tee /opt/rocketmq/conf/2m-2s-sync/broker-master1.properties << 'EOF'
# 集群名称
brokerClusterName=EnterpriseRocketMQCluster
# Broker名称
brokerName=broker-master1
# Broker ID (0表示主节点)
brokerId=0# NameServer地址
namesrvAddr=192.168.6.10:9876;192.168.6.11:9876;192.168.6.12:9876# 监听端口
listenPort=10911
# VIP通道端口
vipChannelEnabled=true# 存储路径
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/commitlog
storePathConsumeQueue=/data/rocketmq/consumequeue
storePathIndex=/data/rocketmq/index
storePathCheckpoint=/data/rocketmq/checkpoint
abortFile=/data/rocketmq/abort# 存储配置
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
maxHashSlotNum=5000000
maxIndexNum=20000000# 消息存储时间(小时)
fileReservedTime=72
deleteWhen=04
diskMaxUsedSpaceRatio=88# HA配置
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH# 从节点地址
brokerIP1=192.168.6.20
haListenPort=10912
haSendHeartbeatInterval=5000
haHousekeepingInterval=20000
haTransferBatchSize=32768# 复制配置
transferMsgByHeap=true
maxTransferCountOnMessageInMemory=4096
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInDisk=128
maxTransferBytesOnMessageInDisk=65536# 线程池配置
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
queryMessageThreadPoolNums=32
adminBrokerThreadPoolNums=16
clientManageThreadPoolNums=32
consumerManageThreadPoolNums=32# 消息大小限制
maxMessageSize=4194304# 刷盘配置
flushCommitLogTimed=false
flushIntervalCommitLog=1000
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushConsumeQueueThoroughInterval=60000
flushCommitLogThoroughInterval=60000# 性能优化
useReentrantLockWhenPutMessage=true
waitTimeMillsInSendQueue=3000
waitTimeMillsInPullQueue=5000
waitTimeMillsInHeartbeatQueue=31000
waitTimeMillsInTransactionQueue=3000# 日志配置
rocketmqHome=/opt/rocketmq
rocketmqLogLevel=INFO
rocketmqLogRoot=/data/rocketmq/logs
rocketmqLogMaxIndex=20
EOF

配置Broker从节点1 (192.168.6.22):

sudo tee /opt/rocketmq/conf/2m-2s-sync/broker-slave1.properties << 'EOF'
# 集群名称
brokerClusterName=EnterpriseRocketMQCluster
# Broker名称(与主节点对应)
brokerName=broker-master1
# Broker ID (非0表示从节点)
brokerId=1# NameServer地址
namesrvAddr=192.168.6.10:9876;192.168.6.11:9876;192.168.6.12:9876# 监听端口
listenPort=10911
vipChannelEnabled=true# 存储路径
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/commitlog
storePathConsumeQueue=/data/rocketmq/consumequeue
storePathIndex=/data/rocketmq/index
storePathCheckpoint=/data/rocketmq/checkpoint
abortFile=/data/rocketmq/abort# 存储配置
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
maxHashSlotNum=5000000
maxIndexNum=20000000# 消息存储时间
fileReservedTime=72
deleteWhen=04
diskMaxUsedSpaceRatio=88# HA配置
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH# 主节点地址
brokerIP1=192.168.6.22
haListenPort=10912# 复制配置
transferMsgByHeap=true# 线程池配置
sendMessageThreadPoolNums=64
pullMessageThreadPoolNums=64
queryMessageThreadPoolNums=16# 消息大小限制
maxMessageSize=4194304# 刷盘配置
flushCommitLogTimed=false
flushIntervalCommitLog=1000
EOF

创建Broker启动脚本:

sudo tee /opt/rocketmq/bin/start-broker.sh << 'EOF'
#!/bin/bash
export ROCKETMQ_HOME=/opt/rocketmq
export JAVA_OPTS="-Xms8g -Xmx8g -Xmn4g -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=150 -XX:InitiatingHeapOccupancyPercent=45 -XX:G1ReservePercent=25 -XX:SurvivorRatio=8 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=4 -XX:+UseStringDeduplication -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime -Xloggc:/data/rocketmq/logs/broker_gc.log"CONF_FILE=$1
if [ -z "$CONF_FILE" ]; thenecho "Usage: $0 <config-file>"exit 1
finohup sh ${ROCKETMQ_HOME}/bin/mqbroker -c ${ROCKETMQ_HOME}/conf/2m-2s-sync/${CONF_FILE} > /data/rocketmq/logs/broker.log 2>&1 &
echo "Broker started with config: $CONF_FILE"
EOFsudo chmod +x /opt/rocketmq/bin/start-broker.shsudo tee /opt/rocketmq/bin/stop-broker.sh << 'EOF'
#!/bin/bash
export ROCKETMQ_HOME=/opt/rocketmq
sh ${ROCKETMQ_HOME}/bin/mqshutdown broker
EOFsudo chmod +x /opt/rocketmq/bin/stop-broker.sh

⑤、启动集群

启动NameServer集群:

# 在所有NameServer节点上执行
cd /opt/rocketmq/bin
./start-nameserver.sh# 检查NameServer状态
tail -f /data/rocketmq/logs/namesrv.log

启动Broker集群:

# 在Broker主节点1上执行
cd /opt/rocketmq/bin
./start-broker.sh broker-master1.properties# 在Broker主节点2上执行
./start-broker.sh broker-master2.properties# 在Broker从节点1上执行
./start-broker.sh broker-slave1.properties# 在Broker从节点2上执行
./start-broker.sh broker-slave2.properties# 检查Broker状态
tail -f /data/rocketmq/logs/broker.log

⑥、安装和配置控制台

在控制台节点 (192.168.6.30) 上:

# 下载RocketMQ控制台
wget https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip
unzip rocketmq-dashboard-1.0.0.zip
cd rocketmq-dashboard-rocketmq-dashboard-1.0.0# 配置控制台
sudo tee src/main/resources/application.properties << 'EOF'
server.port=8080# NameServer地址
rocketmq.config.namesrvAddr=192.168.6.10:9876;192.168.6.11:9876;192.168.6.12:9876# 数据保留时间(天)
rocketmq.config.dataPath=/tmp/rocketmq-console/data
rocketmq.config.enableDashBoardCollect=true# 登录配置(可选)
rocketmq.config.loginRequired=false
rocketmq.config.username=admin
rocketmq.config.password=admin123# 多环境支持
spring.application.name=rocketmq-dashboard
spring.profiles.active=dev# 日志配置
logging.config=classpath:logback-spring.xml
logging.level.com.apache.rocketmq.console=INFO# 线程池配置
server.tomcat.max-threads=200
server.tomcat.min-spare-threads=20# 连接超时
rocketmq.config.timeoutMillis=3000
EOF# 构建和启动控制台
mvn clean package -Dmaven.test.skip=true
nohup java -jar target/rocketmq-dashboard-1.0.0.jar > /data/rocketmq-console/console.log 2>&1 &

⑦、测试方案

集群状态检查

# 检查NameServer状态
curl http://192.168.6.10:9876/namesrv/addrlist# 检查Broker状态
cd /opt/rocketmq/bin
./mqadmin clusterList -n 192.168.6.10:9876# 查看集群信息
./mqadmin clusterList -n 192.168.6.10:9876
./mqadmin brokerStatus -n 192.168.6.10:9876 -b 192.168.6.20:10911

Topic和消息测试

# 创建测试Topic
./mqadmin updateTopic -n 192.168.6.10:9876 -c EnterpriseRocketMQCluster -t TestTopic -w 4 -r 4# 查看Topic列表
./mqadmin topicList -n 192.168.6.10:9876# 查看Topic路由信息
./mqadmin topicRoute -n 192.168.6.10:9876 -t TestTopic# 发送测试消息
./mqadmin sendMessage -n 192.168.6.10:9876 -t TestTopic -p "Hello RocketMQ Cluster"# 消费消息
./mqadmin consumerProgress -n 192.168.6.10:9876 -g TestConsumerGroup

性能压力测试

# 使用自带性能测试工具
# 生产者性能测试
./mqadmin org.apache.rocketmq.example.quickstart.Producer -n 192.168.6.10:9876 -t PerformanceTestTopic -s 1024 -c 10000# 消费者性能测试
./mqadmin org.apache.rocketmq.example.quickstart.Consumer -n 192.168.6.10:9876 -t PerformanceTestTopic -g PerfTestConsumer# 使用benchmark工具
cd /opt/rocketmq/bin
./tools.sh org.apache.rocketmq.example.benchmark.Producer \-n 192.168.6.10:9876 \-t BenchmarkTest \-w 16 \-s 1024 \-c 100000./tools.sh org.apache.rocketmq.example.benchmark.Consumer \-n 192.168.6.10:9876 \-t BenchmarkTest \-g BenchmarkConsumer \-w 16

故障转移测试

# 查看当前Broker状态
./mqadmin clusterList -n 192.168.6.10:9876# 停止一个主Broker
ssh broker-master1 "cd /opt/rocketmq/bin && ./stop-broker.sh"# 等待30秒,观察自动故障转移
sleep 30# 检查从节点是否提升为主节点
./mqadmin clusterList -n 192.168.6.10:9876# 测试消息发送(应该仍然正常工作)
./mqadmin sendMessage -n 192.168.6.10:9876 -t TestTopic -p "Message after failover"# 恢复故障节点
ssh broker-master1 "cd /opt/rocketmq/bin && ./start-broker.sh broker-master1.properties"# 检查数据同步
sleep 30
./mqadmin brokerStatus -n 192.168.6.10:9876 -b 192.168.6.20:10911

数据一致性测试

# 创建顺序消息Topic
./mqadmin updateTopic -n 192.168.6.10:9876 -c EnterpriseRocketMQCluster -t OrderTestTopic -w 1 -r 1 -a +message.type=ORDER# 发送顺序消息
for i in {1..100}; do./mqadmin sendMessage -n 192.168.6.10:9876 -t OrderTestTopic -k "ORDER_KEY_$i" -p "Ordered message $i"
done# 消费并验证顺序
./mqadmin queryMsgByKey -n 192.168.6.10:9876 -t OrderTestTopic -k ORDER_KEY_1# 检查消息堆积
./mqadmin consumerProgress -n 192.168.6.10:9876 -g OrderTestConsumer

监控和告警测试

# 安装Prometheus exporter(在监控节点上)
wget https://github.com/apache/rocketmq-exporter/releases/download/v0.0.2/rocketmq-exporter-0.0.2.jar# 启动exporter
nohup java -jar rocketmq-exporter-0.0.2.jar \--rocketmq.config.namesrvAddr="192.168.6.10:9876;192.168.6.11:9876;192.168.6.12:9876" \--server.port=5557 \> /data/rocketmq-exporter/exporter.log 2>&1 &# 检查监控指标
curl http://192.168.6.40:5557/metrics# 配置Grafana仪表板
# 导入RocketMQ监控仪表板

⑧、监控配置

创建集群健康检查脚本

sudo tee /usr/local/bin/rocketmq_cluster_check.sh << 'EOF'
#!/bin/bashecho "=== RocketMQ集群健康检查 ==="
echo "时间: $(date)"# 检查NameServer节点
nameservers=("192.168.6.10" "192.168.6.11" "192.168.6.12")
for ns in "${nameservers[@]}"; doif curl -s "http://$ns:9876" > /dev/null; thenstatus="✓ 在线"elsestatus="✗ 离线"fiecho "NameServer $ns: $status"
done# 检查Broker节点
brokers=("192.168.6.20:10911" "192.168.6.21:10911" "192.168.6.22:10911" "192.168.6.23:10911")
for broker in "${brokers[@]}"; doip=$(echo $broker | cut -d: -f1)port=$(echo $broker | cut -d: -f2)if nc -z $ip $port; thenstatus="✓ 在线"elsestatus="✗ 离线"fiecho "Broker $broker: $status"
done# 检查控制台
if curl -s "http://192.168.6.30:8080" > /dev/null; thenecho "控制台: ✓ 在线"
elseecho "控制台: ✗ 离线"
fi# 检查集群状态
echo -e "\n集群状态:"
cd /opt/rocketmq/bin
./mqadmin clusterList -n 192.168.6.10:9876 2>/dev/null | grep -E "(Broker|clusterName)"# 检查Topic状态
echo -e "\nTopic状态:"
./mqadmin topicList -n 192.168.6.10:9876 2>/dev/null | head -5# 检查消息堆积
echo -e "\n消息堆积情况:"
./mqadmin consumerProgress -n 192.168.6.10:9876 2>/dev/null | head -5echo -e "\n=== 检查完成 ==="
EOFsudo chmod +x /usr/local/bin/rocketmq_cluster_check.sh

配置日志监控

# 配置logrotate
sudo tee /etc/logrotate.d/rocketmq << 'EOF'
/data/rocketmq/logs/*.log {dailymissingokrotate 30compressdelaycompressnotifemptycopytruncatesize 100M
}
EOF# 配置日志监控脚本
sudo tee /usr/local/bin/rocketmq_log_monitor.sh << 'EOF'
#!/bin/bash
# 监控错误日志
ERROR_COUNT=$(grep -c "ERROR" /data/rocketmq/logs/broker.log 2>/dev/null || echo "0")
if [ "$ERROR_COUNT" -gt 10 ]; thenecho "发现 $ERROR_COUNT 个错误日志,请检查!"
fi# 监控GC日志
GC_COUNT=$(grep -c "Full GC" /data/rocketmq/logs/broker_gc.log 2>/dev/null || echo "0")
if [ "$GC_COUNT" -gt 0 ]; thenecho "发现 $GC_COUNT 次Full GC,请优化JVM配置!"
fi
EOFsudo chmod +x /usr/local/bin/rocketmq_log_monitor.sh

SpringBoot进行整合

统一的配置管理:集中管理所有集群组件的配置

优雅的故障处理:完善的异常处理和重试机制

性能优化:连接池、缓存、异步处理等优化

监控告警:健康检查和性能监控

测试覆盖:完整的单元测试和集成测试

可扩展性:易于添加新的集群组件和服务

┌─────────────────────────────────────────────────────────────┐
│                 SpringBoot微服务集群                        │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  服务A      │  │  服务B      │  │  服务C      │         │
│  │ (业务逻辑)  │  │ (业务逻辑)  │  │ (业务逻辑)  │         │
│  └─────┬───────┘  └─────┬───────┘  └─────┬───────┘         │
│        │                │                │                 │
└────────┼────────────────┼────────────────┼─────────────────┘│                │                │
┌────────┼────────────────┼────────────────┼─────────────────┐
│        ▼                ▼                ▼                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                SpringCloud Gateway                 │   │
│  │               (统一网关和负载均衡)                  │   │
│  └─────────────────────────┬───────────────────────────┘   │
│                            │                               │
└────────────────────────────┼───────────────────────────────┘│
┌────────────────────────────┼───────────────────────────────┐
│                   集群组件集成层                           │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐       │
│  │ MySQL   │  │ Redis   │  │  ES     │  │ RocketMQ│       │
│  │ 集群    │  │ 集群    │  │ 集群    │  │ 集群    │       │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘       │
└───────────────────────────────────────────────────────────┘

一、依赖

<properties><java.version>11</java.version><spring-boot.version>2.7.0</spring-boot.version><spring-cloud.version>2021.0.3</spring-cloud.version>
</properties><dependencies><!-- SpringBoot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><!-- 集群客户端 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><!-- 连接池 --><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId></dependency><!-- 工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency><!-- 测试 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

二、与MySQL进行整合

# application-mysql.yml
spring:datasource:dynamic:primary: master #指定默认的、主要的数据源为 master。当操作没有明确指定数据源时,会使用这个strict: true #启用严格模式。在严格模式下,如果尝试查找一个不存在的数据源,将会抛出异常。如果为 false,则会使用 primary 指定的主数据源作为回退datasource:master: #负责写入操作(INSERT, UPDATE, DELETE)url: jdbc:mysql://192.168.2.10:3306/enterprise_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverslave1: #这些是从库 (Slave),通常负责只读操作(SELECT)url: jdbc:mysql://192.168.2.11:3306/enterprise_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverslave2: #这些是从库 (Slave),通常负责只读操作(SELECT)url: jdbc:mysql://192.168.2.12:3306/enterprise_db?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverhikari:maximum-pool-size: 20 #这些是从库 (Slave),通常负责只读操作(SELECT)minimum-idle: 5 #连接池中维护的最小空闲连接数。connection-timeout: 30000 #客户端等待连接的最大毫秒数(30秒)。idle-timeout: 600000 # 连接在池中闲置多久后会被释放(10分钟)。max-lifetime: 1800000 #一个连接的最大生命周期(30分钟),超时即使空闲也会被回收shardingsphere:datasource: #声明 ShardingSphere 要管理的数据源名称列表,这些名称必须与第一步中定义的 key(master, slave1, slave2)一致names: master,slave1,slave2props:#设置为 true 后,ShardingSphere 会在控制台打印出执行的实际 SQL 语句及其路由到的数据源,方便开发者排查问题sql-show: true rules:replica-query:data-sources:readwrite_ds: #定义了一个逻辑数据源 readwrite_ds。你的应用程序在代码中操作数据库时,只需要面向这个逻辑数据源即可,无需关心底层是主库还是从库。primary-data-source-name: master# 指定该逻辑数据源的写节点对应物理数据源 masterreplica-data-source-names: slave1,slave2 #指定该逻辑数据源的读节点列表,对应物理数据源 slave1 和 slave2load-balancer-name: round_robin #指定用于在从库中进行负载均衡的策略的名称load-balancers:round_robin:type: ROUND_ROBIN #负载均衡算法类型为轮询。这意味着应用程序的查询请求会按照顺序依次分发到 slave1 -> slave2 -> slave1 -> slave2,从而实现读流量的负载均衡。

在单纯读写分离中,我们使用 replica-query 规则。在分库分表中,我们需要使用 sharding 规则来定义分片策略,并可以在每一个分片数据源内部再嵌套 replica-query 规则来实现该分片自身的读写分离。
核心变化:从 replica-query 到 sharding + replica-query

  • 根据 user_id 字段进行分库分表
  • 数据库:ds0, ds1 (替代原来的 master, slave1…)
  • 物理节点:每个逻辑库(如 ds0)背后也是一个一主二从的集群(如 ds0_master, ds0_slave1, ds0_slave2)
  • 表:t_user_0, t_user_1
spring:datasource:dynamic:# 注意:primary 和 strict 在分片模式下通常由 ShardingSphere 接管,这里可以忽略或删除datasource:# --- ds0 分片的数据源集群 ---ds0_master:url: jdbc:mysql://192.168.2.10:3306/db0?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverds0_slave1:url: jdbc:mysql://192.168.2.11:3306/db0?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverds0_slave2:url: jdbc:mysql://192.168.2.12:3306/db0?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driver# --- ds1 分片的数据源集群 ---ds1_master:url: jdbc:mysql://192.168.3.10:3306/db1?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverds1_slave1:url: jdbc:mysql://192.168.3.11:3306/db1?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverds1_slave2:url: jdbc:mysql://192.168.3.11:3306/db1?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=trueusername: shardingpassword: Sharding123!@#driver-class-name: com.mysql.cj.jdbc.Driverhikari:maximum-pool-size: 20minimum-idle: 5connection-timeout: 30000idle-timeout: 600000max-lifetime: 1800000shardingsphere:datasource:# 定义所有物理数据源的名称names: ds0_master, ds0_slave1, ds0_slave2, ds1_master, ds1_slave1, ds1_slave2props:sql-show: truerules:# 1. 定义分片规则sharding:# 分片表配置tables:t_user:# 逻辑表对应的实际数据节点,符合 Groovy 语法# 表示数据分布在 ds0和ds1 的 t_user_0 和 t_user_1 表上actual-data-nodes: ds$->{0..1}.t_user_$->{0..1}# 分库策略database-strategy:standard:sharding-column: user_idsharding-algorithm-name: db_hash_mod# 分表策略table-strategy:standard:sharding-column: user_idsharding-algorithm-name: table_hash_mod# 分片算法定义sharding-algorithms:# 分库算法:库号 = user_id % 2db_hash_mod:type: INLINEprops:algorithm-expression: ds$->{user_id % 2}# 分表算法:表号 = (user_id / 2) % 2table_hash_mod:type: INLINEprops:algorithm-expression: t_user_$->{(user_id / 2) % 2}# 2. 定义读写分离规则 (为每个分片数据源集群配置)replica-query:data-sources:# 定义逻辑分片 ds0,它对应一个主从集群ds0:primary-data-source-name: ds0_masterreplica-data-source-names: ds0_slave1, ds0_slave2load-balancer-name: round_robin# 定义逻辑分片 ds1,它对应另一个主从集群ds1:primary-data-source-name: ds1_masterreplica-data-source-names: ds1_slave1, ds1_slave2load-balancer-name: round_robinload-balancers:round_robin:type: ROUND_ROBIN

分库分表和纯读写分离区别

①、数据源定义:

  • 之前:master, slave1, slave2(直接对应物理角色)
  • 现在:ds0_master, ds0_slave1, ds1_master, ds1_slave1…(名称体现了 分片 和 角色)

②、规则结构:

  • 之前:只有 replica-query 规则。
  • 现在:同时存在 sharding 和 replica-query 规则,并且 replica-query 是为 sharding 服务的。

③、读写负载均衡的工作流程

  • 第1层路由 (分片路由):当一个SQL请求进来,ShardingSphere 首先根据 sharding 规则中的 sharding-column (如 user_id = 123) 和算法,计算出这条数据应该落在哪个逻辑分片(ds0 或 ds1)
  • 第2层路由 (读写路由):确定目标逻辑分片(例如 ds0)后,ShardingSphere 再查看 replica-query 规则中 ds0 的配置。
    如果是写请求,则路由到该逻辑分片对应的主库 ds0_master
    如果是读请求,则根据配置的round_robin负载均衡算法,从从库列表 (ds0_slave1, ds0_slave2) 中选择一个进行查询

④、逻辑数据源

  • 之前:应用程序操作的是 readwrite_ds 这个逻辑数据源。
  • 现在:应用程序操作的是 t_user 这样的逻辑表。ShardingSphere 会自动根据分片键找到逻辑分片(ds0, ds1),再根据读写类型找到最终的物理数据库

总结:在分库分表场景下,读写分离的负载均衡策略是分层和嵌套的

  1. 分片层:通过分片算法(如取模、范围等)实现数据的负载均衡,将数据分散到不同的数据库分片上。
  2. 读写层:在每个独立的分片内部,通过副本查询负载均衡算法(如轮询、随机等)实现读流量的负载均衡,将查询请求分散到该分片的多个从库上。

这种设计同时解决了数据容量水平扩展和读性能水平扩展两个核心问题,是构建大型分布式应用常用的数据库架构。

动态数据源配置

@Configuration
@EnableTransactionManagement
@MapperScan("com.enterprise.mapper")
public class DataSourceConfig {@Bean@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master")public DataSource masterDataSource() {return DataSourceBuilder.create().build();}@Bean@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave1")public DataSource slave1DataSource() {return DataSourceBuilder.create().build();}@Bean@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave2")public DataSource slave2DataSource() {return DataSourceBuilder.create().build();}@Beanpublic DataSource dynamicDataSource() {Map<Object, Object> targetDataSources = new HashMap<>();targetDataSources.put("master", masterDataSource());targetDataSources.put("slave1", slave1DataSource());targetDataSources.put("slave2", slave2DataSource());DynamicDataSource dynamicDataSource = new DynamicDataSource();dynamicDataSource.setTargetDataSources(targetDataSources);dynamicDataSource.setDefaultTargetDataSource(masterDataSource());return dynamicDataSource;}@Beanpublic PlatformTransactionManager transactionManager() {return new DataSourceTransactionManager(dynamicDataSource());}
}public class DynamicDataSource extends AbstractRoutingDataSource {private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();public static void setDataSource(String dataSource) {CONTEXT_HOLDER.set(dataSource);}public static String getDataSource() {return CONTEXT_HOLDER.get();}public static void clearDataSource() {CONTEXT_HOLDER.remove();}@Overrideprotected Object determineCurrentLookupKey() {return getDataSource();}
}

读写分离注解

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReadOnly {String value() default "slave";
}@Aspect
@Component
public class ReadOnlyAspect {@Before("@annotation(readOnly)")public void setReadDataSource(ReadOnly readOnly) {DynamicDataSource.setDataSource(readOnly.value());}@After("@annotation(readOnly)")public void clearDataSource(ReadOnly readOnly) {DynamicDataSource.clearDataSource();}
}

三、Redis集群整合

# application-redis.yml
spring:redis:cluster:nodes:- 192.168.3.10:6379- 192.168.3.11:6379- 192.168.3.12:6379- 192.168.3.13:6379- 192.168.3.14:6379- 192.168.3.15:6379max-redirects: 3lettuce:pool:max-active: 20max-wait: -1max-idle: 8min-idle: 0cluster:refresh:adaptive: trueperiod: 2000password: RedisCluster123!@#timeout: 5000
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper mapper = new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.activateDefaultTyping(mapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);serializer.setObjectMapper(mapper);template.setValueSerializer(serializer);template.setHashValueSerializer(serializer);template.setKeySerializer(new StringRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useClusterServers().addNodeAddress("redis://192.168.3.10:6379","redis://192.168.3.11:6379","redis://192.168.3.12:6379","redis://192.168.3.13:6379","redis://192.168.3.14:6379","redis://192.168.3.15:6379").setPassword("RedisCluster123!@#").setTimeout(5000).setRetryAttempts(3).setRetryInterval(1500);return Redisson.create(config);}
}

Redis服务类

@Service
@Slf4j
public class RedisService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate RedissonClient redissonClient;public boolean set(String key, Object value, long time) {try {if (time > 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {redisTemplate.opsForValue().set(key, value);}return true;} catch (Exception e) {log.error("Redis set error: {}", e.getMessage());return false;}}public Object get(String key) {return key == null ? null : redisTemplate.opsForValue().get(key);}public boolean acquireLock(String lockKey, String requestId, int expireTime) {RLock lock = redissonClient.getLock(lockKey);try {return lock.tryLock(0, expireTime, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}public void releaseLock(String lockKey) {RLock lock = redissonClient.getLock(lockKey);if (lock.isHeldByCurrentThread()) {lock.unlock();}}
}

四、Elasticsearch集群整合

# application-es.yml
spring:elasticsearch: #用于配置 Elasticsearch 高级 REST 客户端 (High-Level REST Client)。这是 Spring Boot 2.3+ 版本后推荐的用于 HTTP 通信的客户端uris: #客户端会使用这些地址进行负载均衡和故障转移。如果其中一个节点不可用,客户端会自动尝试连接列表中的其他节点。- http://192.168.4.10:9200- http://192.168.4.11:9200- http://192.168.4.12:9200username: "" #用于连接启用了安全认证(如 X-Pack 安全功能)的 Elasticsearch 集群的凭据password: "" #本例中为空字符串 "",表示当前连接的 Elasticsearch 集群没有启用用户名密码认证。如果启用了认证,这里应填写类似 elastic:your_password 的凭据connection-timeout: 3000 #客户端尝试与 Elasticsearch 服务器建立连接时,如果在 3 秒内还没有成功,则会抛出连接超时异常socket-timeout: 5000 #客户端在等待服务器响应时,如果连续 5 秒内没有收到任何数据,则会抛出套接字超时异常。这防止了因为网络或服务器问题导致的请求无限期挂起。data:elasticsearch: #用于控制 Repository 的自动配置和传统的传输客户端(Transport Client,已废弃)的配置repositories:enabled: true #设置为 true 后,Spring Boot 会自动扫描并创建实现了 ElasticsearchRepository 接口的 Bean。你可以通过编写接口来自动生成查询数据访问层代码,类似于 JPA Repository。这是推荐的使用方式。cluster-nodes: 192.168.4.10:9300,192.168.4.11:9300,192.168.4.12:9300 #(已过时的配置) 用于配置旧的传输客户端 (Transport Client) 的连接地址。cluster-name: enterprise-es-cluster #已过时的配置) 指定 Elasticsearch 集群的名称,用于传输客户端验证。

现代配置 vs 遗留配置:

  • 这个配置文件是一个混合体,同时包含了现代 REST 客户端的配置 (spring.elasticsearch) 和已过时的传输客户端的配置
    (spring.data.elasticsearch.cluster-nodes 和 cluster-name)。
  • 对于新项目,你应该移除 cluster-nodes 和 cluster-name 这两个配置项,因为它们不再起作用且会造成混淆。

最终生效的配置:

  • 在 Spring Boot 2.3+ 版本中,spring.elasticsearch.uris 的配置是生效的。应用程序将通过 REST
    客户端连接到 9200 端口的三个节点。
  • repositories.enabled: true 是有效的,它启用了方便的 Repository 编程模式。

连接方式:

  • 应用程序实际使用的是 HTTP REST 客户端,通过 9200 端口与集群通信。
  • 配置中出现的 9300 端口不会被使用(除非你做了额外的、不推荐的配置来强制使用传输客户端)。
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.enterprise.es.repository")
public class ElasticsearchConfig {@Value("${spring.elasticsearch.uris}")private List<String> uris;@Beanpublic RestHighLevelClient restHighLevelClient() {HttpHost[] httpHosts = uris.stream().map(uri -> {String[] parts = uri.split(":");return new HttpHost(parts[0], Integer.parseInt(parts[1]), "http");}).toArray(HttpHost[]::new);return new RestHighLevelClient(RestClient.builder(httpHosts).setRequestConfigCallback(requestConfigBuilder ->requestConfigBuilder.setConnectTimeout(5000).setSocketTimeout(60000)).setHttpClientConfigCallback(httpClientBuilder ->httpClientBuilder.setMaxConnTotal(100).setMaxConnPerRoute(50)));}@Beanpublic ElasticsearchOperations elasticsearchTemplate() {return new ElasticsearchRestTemplate(restHighLevelClient());}
}
@Service
@Slf4j
public class ElasticsearchService {@Autowiredprivate RestHighLevelClient restHighLevelClient;public boolean createIndex(String indexName) {try {CreateIndexRequest request = new CreateIndexRequest(indexName);request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);return response.isAcknowledged();} catch (Exception e) {log.error("Create index error: {}", e.getMessage());return false;}}public <T> boolean indexDocument(String indexName, String id, T document) {try {IndexRequest request = new IndexRequest(indexName).id(id).source(convertToMap(document), XContentType.JSON).timeout(TimeValue.timeValueSeconds(5));IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);return response.status() == RestStatus.CREATED;} catch (Exception e) {log.error("Index document error: {}", e.getMessage());return false;}}public <T> List<T> search(String indexName, SearchSourceBuilder sourceBuilder, Class<T> clazz) {try {SearchRequest searchRequest = new SearchRequest(indexName);searchRequest.source(sourceBuilder);SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);return Arrays.stream(response.getHits().getHits()).map(hit -> {try {return objectMapper.readValue(hit.getSourceAsString(), clazz);} catch (Exception e) {log.error("Parse search result error: {}", e.getMessage());return null;}}).filter(Objects::nonNull).collect(Collectors.toList());} catch (Exception e) {log.error("Search error: {}", e.getMessage());return Collections.emptyList();}}private Map<String, Object> convertToMap(Object obj) {ObjectMapper mapper = new ObjectMapper();return mapper.convertValue(obj, Map.class);}
}

五、RocketMQ集群整合

# application-rocketmq.yml
rocketmq:name-server: 192.168.6.10:9876;192.168.6.11:9876;192.168.6.12:9876producer:group: enterprise-producer-groupsend-message-timeout: 3000compress-message-body-threshold: 4096max-message-size: 4194304retry-times-when-send-async-failed: 2retry-times-when-send-failed: 2consumer:group: enterprise-consumer-groupconsume-thread-min: 5consume-thread-max: 32consume-timeout: 15suspend-current-queue-time-millis: 1000max-reconsume-times: 3
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
public class RocketMQConfig {@Autowiredprivate RocketMQProperties rocketMQProperties;@Beanpublic DefaultMQProducer defaultMQProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer(rocketMQProperties.getProducer().getGroup());producer.setNamesrvAddr(rocketMQProperties.getNameServer());producer.setSendMsgTimeout(rocketMQProperties.getProducer().getSendMessageTimeout());producer.setCompressMsgBodyOverHowmuch(rocketMQProperties.getProducer().getCompressMessageBodyThreshold());producer.setMaxMessageSize(rocketMQProperties.getProducer().getMaxMessageSize());producer.setRetryTimesWhenSendAsyncFailed(rocketMQProperties.getProducer().getRetryTimesWhenSendAsyncFailed());producer.setRetryTimesWhenSendFailed(rocketMQProperties.getProducer().getRetryTimesWhenSendFailed());producer.start();return producer;}@Beanpublic DefaultMQPushConsumer orderConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");consumer.setNamesrvAddr(rocketMQProperties.getNameServer());consumer.subscribe("ORDER_TOPIC", "*");consumer.setConsumeThreadMin(rocketMQProperties.getConsumer().getConsumeThreadMin());consumer.setConsumeThreadMax(rocketMQProperties.getConsumer().getConsumeThreadMax());consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {// 顺序消息处理逻辑return ConsumeOrderlyStatus.SUCCESS;});consumer.start();return consumer;}
}

MQ服务类

@Service
@Slf4j
public class RocketMQService {@Autowiredprivate DefaultMQProducer producer;public SendResult sendMessage(String topic, String tags, Object message) {try {String messageBody = JSON.toJSONString(message);Message msg = new Message(topic, tags, messageBody.getBytes(StandardCharsets.UTF_8));return producer.send(msg);} catch (Exception e) {log.error("Send message error: {}", e.getMessage());throw new BusinessException("消息发送失败");}}public SendResult sendAsyncMessage(String topic, String tags, Object message, SendCallback callback) {try {String messageBody = JSON.toJSONString(message);Message msg = new Message(topic, tags, messageBody.getBytes(StandardCharsets.UTF_8));producer.send(msg, callback);return null;} catch (Exception e) {log.error("Send async message error: {}", e.getMessage());throw new BusinessException("异步消息发送失败");}}public void sendDelayMessage(String topic, String tags, Object message, int delayLevel) {try {String messageBody = JSON.toJSONString(message);Message msg = new Message(topic, tags, messageBody.getBytes(StandardCharsets.UTF_8));msg.setDelayTimeLevel(delayLevel);producer.send(msg);} catch (Exception e) {log.error("Send delay message error: {}", e.getMessage());throw new BusinessException("延迟消息发送失败");}}
}@Component
@RocketMQMessageListener(topic = "BUSINESS_TOPIC",consumerGroup = "business-consumer-group",consumeMode = ConsumeMode.CONCURRENTLY,messageModel = MessageModel.CLUSTERING
)
@Slf4j
public class BusinessMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {try {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Received message: {}", body);// 业务处理逻辑processBusinessMessage(body);} catch (Exception e) {log.error("Process message error: {}", e.getMessage());throw new RuntimeException("消息处理失败", e);}}private void processBusinessMessage(String messageBody) {// 具体的业务处理逻辑}
}

六、MongoDB集群整合

认证信息: clusterAdmin:ClusterAdmin123!@#

  • 使用用户名 clusterAdmin 和密码 ClusterAdmin123!@#
    连接数据库。这是一个具有集群管理权限的账号,权限很大,通常建议为应用创建专属的、权限最小化的用户。

服务器地址: 192.168.5.10:27017,192.168.5.11:27017,192.168.5.12:27017

  • 明确指定了副本集的三个成员节点。应用程序可以通过其中任何一个节点发现整个副本集的所有成员。

目标数据库: /enterprise_db

  • 应用程序默认使用的数据库是 enterprise_db。

副本集名称: replicaSet=configReplSet

  • 指定副本集的名称必须为 configReplSet。驱动程序会验证连接的副本集名称是否与此一致,这是一种安全措施,防止错误连接到其他集群。

认证源: authSource=admin

  • 指定用户 clusterAdmin 的凭据信息是在 admin 数据库中创建的。这是管理用户的常见做法。
# application-mongodb.yml
spring:data:mongodb:uri: mongodb://clusterAdmin:ClusterAdmin123!@#192.168.5.10:27017,192.168.5.11:27017,192.168.5.12:27017/enterprise_db?replicaSet=configReplSet&authSource=adminoptions:maxPoolSize: 100 #连接池允许的最大连接数。防止应用创建过多连接压垮数据库。根据应用并发需求和数据库处理能力设置。minPoolSize: 10 #连接池始终保持的最小空闲连接数。应用启动后就会建立这些连接,以便快速响应请求,避免初次请求的建连开销maxIdleTimeMS: 300000 #空闲连接在池中存活的最长时间。超过此时间,空闲连接将被关闭释放资源。有助于在流量低谷时缩减连接数maxLifeTimeMS: 3600000 #一个连接从创建到销毁的最大生命周期。即使连接空闲,超过此时间也会被替换。有助于防止长时间运行的连接可能遇到的网络问题或状态异常。connectTimeoutMS: 5000 #一个连接从创建到销毁的最大生命周期。即使连接空闲,超过此时间也会被替换。有助于防止长时间运行的连接可能遇到的网络问题或状态异常。socketTimeoutMS: 60000 #发送请求后,等待服务器响应的超时时间。重要提示:这不包括连接建立时间,而是指网络传输和数据库操作的总时间。设置过短可能导致长时间操作失败。serverSelectionTimeoutMS: 30000 #驱动程序尝试选择一台可用的服务器(Primary, Secondary)进行操作的最大时间。如果30秒内找不到符合要求的服务器(如找不到Primary用于写操作),则抛出异常。这是整体操作超时的一个重要指标。readPreference: secondaryPreferred #优先从副本集的 Secondary(从节点)读取数据,只有当所有 Secondary 都不可用时,才从 Primary(主节点)读取。这是一种经典的读写分离配置,将读请求分散到多个从节点,减轻主节点的压力,提高系统的读吞吐量。适用于对数据实时性要求不非常严格的读场景(因为从节点的数据可能稍落后于主节点)。readConcern: majority #只读取已被副本集中大多数节点确认并持久化的数据,避免了读取到可能因网络分区而回滚的数据(脏读),提供了很强的数据一致性保证。通常与 secondaryPreferred 搭配使用,即使在从节点读也能保证读到稳定数据。writeConcern: majority #写关注:要求写操作必须被副本集中大多数节点确认后才向客户端返回成功。提供了强一致性保证,确保数据在大多数节点上持久化,即使主节点宕机,数据也不会丢失。这是生产环境推荐的安全性配置,但会稍微增加写操作的延迟。mongodb:transaction:enabled: true #启用 MongoDB 的多文档事务支持(需要 MongoDB 4.0+)。这允许应用在多个文档或集合上执行 ACID 事务。retry:reads: true#启用重试:当遇到网络错误或副本集主节点切换等可重试错误时,驱动程序会自动重试失败的读/写操作。writes: true #极大提高了应用程序的容错能力和可用性,对开发者透明,无需在业务代码中编写大量重试逻辑。这是生产环境的最佳实践。
@Configuration
@EnableMongoAuditing
@EnableMongoRepositories(basePackages = "com.enterprise.mongodb.repository")
public class MongoDBConfig {@Value("${spring.data.mongodb.uri}")private String mongoUri;@Beanpublic MongoClient mongoClient() {ConnectionString connectionString = new ConnectionString(mongoUri);MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(connectionString).applyToConnectionPoolSettings(builder -> builder.maxSize(100).minSize(10).maxConnectionIdleTime(5, TimeUnit.MINUTES).maxConnectionLifeTime(1, TimeUnit.HOURS)).applyToSocketSettings(builder -> builder.connectTimeout(5, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS)).applyToServerSettings(builder -> builder.heartbeatFrequency(10, TimeUnit.SECONDS).minHeartbeatFrequency(500, TimeUnit.MILLISECONDS)).retryReads(true).retryWrites(true).readConcern(ReadConcern.MAJORITY).writeConcern(WriteConcern.MAJORITY).readPreference(ReadPreference.secondaryPreferred()).build();return MongoClients.create(settings);}@Beanpublic MongoTemplate mongoTemplate() {return new MongoTemplate(mongoClient(), "enterprise_db");}@Beanpublic MongoTransactionManager transactionManager() {return new MongoTransactionManager(mongoDbFactory());}@Beanpublic SimpleMongoClientDatabaseFactory mongoDbFactory() {return new SimpleMongoClientDatabaseFactory(mongoClient(), "enterprise_db");}@Beanpublic MappingMongoConverter mappingMongoConverter() {DbRefResolver dbRefResolver = new DefaultDbRefResolver(mongoDbFactory());MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, new MongoMappingContext());converter.setTypeMapper(new DefaultMongoTypeMapper(null));return converter;}
}

七、业务中应用

  • 用户服务UserService
  • 订单服务OrderService
  • 数据持久化:通过 MyBatis (UserMapper) 操作数据库
  • 缓存:通过 RedisService 提升读取性能。
  • 事务管理:通过 @Transactional 保证数据一致性
  • 异步处理:通过 @Async 提升接口响应速度和处理能力。
  • 消息队列:通过 RocketMQService 实现系统解耦和事件驱动架构。
  • 搜索集成:将数据同步到 Elasticsearch。
  • 读写分离:通过自定义 @ReadOnly 注解优化读性能。
@Service
@Slf4j //一个 Lombok 注解,用于自动生成日志记录器(Logger)编译时,Lombok 会在类中自动插入一行代码:private static final Logger log = LoggerFactory.getLogger(UserService.class);。这样你就可以在类中直接使用 log.debug(), log.info(), log.error() 等方法,而无需手动声明 logger 变量。这大大简化了代码。
@Transactional //声明这个类中所有 public 方法都将在事务中执行 这是 Spring 声明式事务管理的核心注解。当方法被调用时,Spring 会自动开启一个事务;如果方法执行成功(未抛出异常),则提交事务;如果方法抛出运行时异常(RuntimeException) 或 Error,则回滚事务。它也可以加在单个方法上,此时该方法的事务设置会覆盖类级别的事务设置。
public class UserService {@Autowiredprivate UserMapper userMapper;@Autowiredprivate RedisService redisService;@Autowiredprivate RocketMQService rocketMQService;private static final String USER_CACHE_PREFIX = "user:";/**@ReadOnly 自定义注解,其意图通常是标记该方法为只读操作,可能用于指导数据库读写分离。与一个 Spring AOP 切面(Aspect)配合工作。这个切面会拦截所有带有 @ReadOnly 注解的方法,并在执行前将数据源切换到只读从库,从而实现读写分离。这是一种非常优雅的实现读写分离的方式。*/@ReadOnlypublic User getUserById(Long userId) {String cacheKey = USER_CACHE_PREFIX + userId;// 先查缓存User user = (User) redisService.get(cacheKey);if (user != null) {return user;}// 缓存不存在,查数据库user = userMapper.selectById(userId);if (user != null) {redisService.set(cacheKey, user, 3600); // 缓存1小时}return user;}public void createUser(User user) {// 写入数据库userMapper.insert(user);/**发 MQ 消息:通过 RocketMQ 发送一个“用户已创建”的领域事件(Domain Event)。其他微服务可以订阅这个消息,来实现自己的业务逻辑(如发送欢迎邮件、初始化积分等),从而实现系统解耦。*/UserCreatedEvent event = new UserCreatedEvent(user.getId(), user.getUsername());rocketMQService.sendMessage("USER_TOPIC", "CREATE", event);// 清除缓存,这是一种常见的缓存更新策略,确保下次读取时能拿到最新的数据。String cacheKey = USER_CACHE_PREFIX + user.getId();redisService.delete(cacheKey);}/**声明该方法是异步的当其他代码调用 asyncProcessUser 方法时,Spring 不会同步执行它,而是会立即返回(返回 null 或 Future)。方法的实际执行将发生在一个新的、独立的线程中,该线程由 Spring 的线程池管理。这适用于处理耗时操作且不需要立即得到结果的场景,如发送短信、记录日志、数据同步等。使用 try-catch 捕获所有异常并记录日志,这是异步方法中非常重要的一点,因为异常不会像同步调用那样抛给调用者,如果不捕获,会导致失败悄无声息。*/@Asyncpublic void asyncProcessUser(Long userId) {// 异步处理用户相关业务try {processUserData(userId);indexUserToES(userId);//用户数据同步到 Elasticsearch} catch (Exception e) {log.error("Async process user error: {}", e.getMessage());}}private void indexUserToES(Long userId) {//复用了 getUserById 方法(因此也走了缓存),然后将用户对象作为文档存入 ES 的 users 索引中User user = getUserById(userId);if (user != null) {elasticsearchService.indexDocument("users", userId.toString(), user);}}
}
@Service
@Slf4j
public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate RocketMQService rocketMQService;public void createOrder(Order order) {/**构造分布式锁的键。这里以用户ID为粒度进行加锁,意味着同一个用户同时只能创建一个订单,但不同用户可以并发创建。这是非常精细和合理的锁策略。*/String lockKey = "order_lock:" + order.getUserId();/**从 Redisson 客户端获取一个分布式锁对象。Redisson 是一个基于 Redis 的 Java 驻内存数据网格(In-Memory Data Grid),它提供了丰富的分布式 Java对象和服务,其中就包括分布式锁。*/RLock lock = redissonClient.getLock(lockKey);try {/**尝试获取锁,这是比直接使用 lock.lock() 更友好的方式最多等待 5 秒来获取锁,如果5秒内获取不到,则返回 false成功获取锁后,锁最多自动持有 10 秒,超过这个时间即使没有手动释放,锁也会自动失效,防止死锁。*/if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {try {// 业务逻辑orderMapper.insert(order);/**把什么消息(event),发送到哪个分类(topic),并打上什么子分类标记(tags)"ORDER_TOPIC" (topic:主题):作用:消息的一级分类,用于区分不同的业务领域。解析:所有订单相关的消息都发送到这个主题。消费者通过订阅他们感兴趣的主题来接收消息。例如,积分服务 和 库存服务 都会订阅 ORDER_TOPIC。"CREATE" (tags:标签):作用:主题下的二级分类,用于更精细地区分同一业务下的不同操作类型。解析:除了 CREATE,可能还有 PAY(支付)、CANCEL(取消)、SHIP(发货)等标签。消费者可以只订阅他们关心的特定标签(如 ORDER_TOPIC:CREATE),从而过滤掉不需要的消息,非常灵活。event (body:消息体):作用:消息的实际内容,即要传递的业务数据。解析:就是我们上面创建的 OrderCreatedEvent 对象(会被序列化成字节数组)。消费者通过实现RocketMQListener,并实现onMessage方法@RocketMQMessageListener(topic = "ORDER_TOPIC",           // 订阅的主题selectorExpression = "CREATE",   // 订阅的标签,* 代表所有consumerGroup = "inventory-consumer-group" // 消费者组,用于集群消费和负载均衡)*/OrderCreatedEvent event = new OrderCreatedEvent(order);rocketMQService.sendMessage("ORDER_TOPIC", "CREATE", event);} finally {//无论业务逻辑是成功完成还是抛出异常,锁都会被释放,避免死锁lock.unlock();}} else {throw new BusinessException("系统繁忙,请稍后重试");}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new BusinessException("创建订单失败");}}/**OrderQuery query:自定义的查询条件封装对象Pageable pageable:Spring Data 提供的分页参数对象,包含页码、每页大小、排序等信息。*/@ReadOnlypublic Page<Order> searchOrders(OrderQuery query, Pageable pageable) {// 用于构建 ES 查询的 DSL(Domain Specific Language)SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();/**用于构建布尔查询(组合查询),可以包含 must(并且)、should(或者)、must_not(非)等条件。如果有关键词,在订单号(orderNo)和商品名(productName)字段上进行 match 查询(分词查询),逻辑关系是 should(或)状态过滤:如果有状态值,进行 term 查询(精确匹配),逻辑关系是 must(且)*/BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();if (StringUtils.isNotBlank(query.getKeyword())) {boolQuery.should(QueryBuilders.matchQuery("orderNo", query.getKeyword()));boolQuery.should(QueryBuilders.matchQuery("productName", query.getKeyword()));}if (query.getStatus() != null) {boolQuery.must(QueryBuilders.termQuery("status", query.getStatus()));}/**sourceBuilder.from((int) pageable.getOffset()):设置从第几条开始查询。sourceBuilder.size(pageable.getPageSize()):设置每页大小。sourceBuilder.sort("createTime", SortOrder.DESC):按创建时间降序排列。*/sourceBuilder.query(boolQuery);sourceBuilder.from((int) pageable.getOffset());sourceBuilder.size(pageable.getPageSize());sourceBuilder.sort("createTime", SortOrder.DESC);/**最终调用一个封装的 ES 服务来执行查询,并返回一个 Page<Order> 对象,该对象包含了查询结果和分页信息(如总条数、总页数等)*/return elasticsearchService.search("orders", sourceBuilder, Order.class, pageable);}
}

八、测试用例

单元测试

@SpringBootTest
@ActiveProfiles("test")
@Slf4j
public class UserServiceTest {@Autowiredprivate UserService userService;@Autowiredprivate RedisService redisService;@MockBeanprivate RocketMQService rocketMQService;@Test@Transactionalpublic void testGetUserById() {// 准备测试数据User user = new User();user.setUsername("testuser");user.setEmail("test@example.com");userMapper.insert(user);// 测试缓存User result = userService.getUserById(user.getId());assertNotNull(result);assertEquals("testuser", result.getUsername());// 验证缓存String cacheKey = "user:" + user.getId();User cachedUser = (User) redisService.get(cacheKey);assertNotNull(cachedUser);}@Testpublic void testCreateUser() {User user = new User();user.setUsername("newuser");user.setEmail("new@example.com");userService.createUser(user);// 验证数据库User savedUser = userMapper.selectById(user.getId());assertNotNull(savedUser);// 验证消息发送verify(rocketMQService, times(1)).sendMessage(eq("USER_TOPIC"), eq("CREATE"), any(UserCreatedEvent.class));}
}

集成测试

@SpringBootTest
@ActiveProfiles("integration")
@Slf4j
public class OrderServiceIntegrationTest {@Autowiredprivate OrderService orderService;@Autowiredprivate ElasticsearchService elasticsearchService;@Testpublic void testCreateOrderWithDistributedLock() {Order order = new Order();order.setUserId(1L);order.setProductId(1001L);order.setAmount(new BigDecimal("199.99"));orderService.createOrder(order);// 验证订单创建Order createdOrder = orderMapper.selectById(order.getId());assertNotNull(createdOrder);// 验证ES索引Order esOrder = elasticsearchService.getDocument("orders", order.getId().toString(), Order.class);assertNotNull(esOrder);}@Testpublic void testOrderSearch() {// 准备测试数据Order order = new Order();order.setOrderNo("TEST2024001");order.setProductName("Test Product");order.setStatus(1);orderMapper.insert(order);elasticsearchService.indexDocument("orders", order.getId().toString(), order);// 测试搜索OrderQuery query = new OrderQuery();query.setKeyword("TEST2024001");query.setStatus(1);Page<Order> result = orderService.searchOrders(query, PageRequest.of(0, 10));assertEquals(1, result.getTotalElements());assertEquals("TEST2024001", result.getContent().get(0).getOrderNo());}
}

性能测试

@SpringBootTest
@ActiveProfiles("performance")
@Slf4j
public class PerformanceTest {@Autowiredprivate UserService userService;@Testpublic void testConcurrentUserAccess() throws InterruptedException {int threadCount = 100;int requestCount = 1000;CountDownLatch latch = new CountDownLatch(threadCount);Long userId = 1L;AtomicInteger successCount = new AtomicInteger(0);for (int i = 0; i < threadCount; i++) {new Thread(() -> {for (int j = 0; j < requestCount; j++) {try {User user = userService.getUserById(userId);if (user != null) {successCount.incrementAndGet();}} catch (Exception e) {log.error("Request error: {}", e.getMessage());}}latch.countDown();}).start();}latch.await(30, TimeUnit.SECONDS);log.info("Total requests: {}, Success: {}", threadCount * requestCount, successCount.get());assertTrue(successCount.get() > threadCount * requestCount * 0.95);}
}

九、监控和健康检查

@Component
@Slf4j
public class ClusterHealthChecker {@Autowiredprivate RedisService redisService;@Autowiredprivate ElasticsearchService elasticsearchService;@Autowiredprivate DataSource dataSource;@Scheduled(fixedRate = 30000)public void checkClusterHealth() {checkRedisHealth();checkElasticsearchHealth();checkDatabaseHealth();}private void checkRedisHealth() {try {redisService.set("health_check", "ok", 10);String result = (String) redisService.get("health_check");if (!"ok".equals(result)) {log.warn("Redis health check failed");}} catch (Exception e) {log.error("Redis health check error: {}", e.getMessage());}}private void checkElasticsearchHealth() {try {boolean exists = elasticsearchService.indexExists("health_check");if (!exists) {log.warn("Elasticsearch health check failed");}} catch (Exception e) {log.error("Elasticsearch health check error: {}", e.getMessage());}}private void checkDatabaseHealth() {try (Connection connection = dataSource.getConnection()) {try (Statement stmt = connection.createStatement()) {stmt.execute("SELECT 1");}} catch (Exception e) {log.error("Database health check error: {}", e.getMessage());}}
}

额外补充

一、MongoDB的核心优势(为什么这些场景适合用它)

  • 灵活的文档模型(BSON):数据以类似JSON的文档形式存储,字段可以随时添加或修改,无需像关系型数据库那样先执行耗时的ALTER
    TABLE操作。这非常适合敏捷开发。
  • 水平扩展(Sharding):通过分片技术,MongoDB可以轻松地将数据分布到多个服务器上,以处理海量数据和高吞吐量需求。
  • 高性能:嵌入式数据模型减少了数据库的JOIN操作;丰富的索引类型(单字段、复合、文本、地理空间、TTL等)能极大提升查询速度。
  • 高可用性(Replica Set):复制集提供自动故障转移和数据冗余,确保服务7x24小时可用。

二、典型使用场景

  1. 互联网内容管理(CMS)、博客、用户画像

场景描述:文章、博客帖子、用户个人资料等。这些对象可能有很多不同的属性和嵌套结构。

为何适合:

  • 一篇文章可能包含标题、作者、正文、标签、评论数组、点赞数等。MongoDB的一个文档可以完美地表示整个文章及其所有嵌套信息。
  • 不同文章可以有不同的字段,非常灵活。例如,有些文章可能有视频链接,而其他文章可能没有,这不会产生任何NULL值问题。
  1. 电子商务平台

场景描述:产品目录、用户订单、商品库存。

为何适合:

  • 产品目录:不同品类的商品属性差异巨大(例如,CPU的“核心数”和T恤的“尺码”)。MongoDB的灵活模式允许你在同一个products集合中存储结构迥异的产品,无需为每个品类创建单独的表。
  • 订单:一个订单文档可以嵌入订单项(items)、送货地址、支付信息等,查询整个订单信息非常高效。
  1. 实时分析与高性能读写

场景描述:物联网(IoT)、日志系统、实时数据采集(如游戏中的玩家状态、分数)。

为何适合:

  • 高写入吞吐量:MongoDB可以处理大量的插入操作,非常适合从传感器或应用程序日志中持续流入的数据。
  • 聚合框架:MongoDB提供了强大的聚合管道功能,可以对这些海量数据进行实时分组、统计、转换和分析。
  • TTL索引:可以为数据自动设置过期时间(如7天前的日志自动删除),简化了数据生命周期管理。
  1. 移动应用和社交网络

场景描述:用户动态(Feeds)、好友关系、地理位置应用。

为何适合:

  • 一条用户动态可能包含文字、图片、位置、@的人、点赞列表等,是典型的半结构化数据。
  • 地理空间查询:MongoDB原生支持地理空间索引,可以高效地执行“查找附近的人”、“查找最近的餐厅”等查询。
  • 社交图谱(谁关注了谁)虽然也可以用文档表示,但对于非常复杂的图谱,专门的图数据库可能更优。
  1. 缓存和会话存储

场景描述:存储用户会话(Session)信息。

为何适合:

  • 将会话数据(如用户ID、登录状态、购物车物品)作为一个完整的文档存储和检索,速度非常快。
  • 可以结合TTL索引实现会话的自动过期,无需手动清理。

三、不太适合的场景(劣势场景)

尽管MongoDB很强大,但它并非万能。在以下场景中,其他数据库可能是更好的选择:

复杂的多表事务操作:

  • 问题:虽然MongoDB已经支持了多文档事务,但其性能和在功能完整性上与传统关系型数据库(如PostgreSQL)相比仍有差距。
  • 替代方案:需要高度复杂事务的系统(如银行核心交易系统)更适合使用关系型数据库。

高度规范化的数据结构:

  • 问题:如果你的数据模型天生就是高度规范化、结构稳定且关系复杂的,强制使用MongoDB的嵌入式或引用模型可能会变得笨拙。
  • 替代方案:传统的OLTP(联机事务处理)系统,如ERP、CRM等,通常是关系型数据库的主场。

传统的商业智能(BI)报表:

  • 问题:需要大量复杂的表连接(JOIN)和跨表分析的场景。MongoDB的聚合框架虽然强大,但在处理多集合关联查询时不如SQL直观和高效。
  • 替代方案:关系型数据库或专门的数据仓库(如Snowflake、BigQuery)。

需要强大SQL功能的应用:

  • 问题:许多现有的商业工具和系统都重度依赖SQL。虽然MongoDB提供了类似SQL的查询语言,但并非完全兼容。
  • 替代方案:支持标准SQL的数据库。

MongoDB实体类

@Document(collection = "users")
@CompoundIndexes({@CompoundIndex(name = "email_username_idx", def = "{'email': 1, 'username': 1}")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserDocument {@Idprivate String id;@Indexed(unique = true)private String username;@Indexed(unique = true)private String email;private String firstName;private String lastName;@Field("created_at")@CreatedDateprivate Date createdAt;@Field("updated_at")@LastModifiedDateprivate Date updatedAt;private Address address;@Versionprivate Long version;@Document@Datapublic static class Address {private String street;private String city;private String country;private String zipCode;@GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)private double[] location;}
}@Document(collection = "orders")
@Sharded(shardKey = {"customerId", "createdAt"})
@Data
public class OrderDocument {@Idprivate String id;@Indexedprivate String customerId;@Indexedprivate String orderNumber;private BigDecimal amount;private String currency;@Field("order_status")private String status;private List<OrderItem> items;@Field("created_at")@CreatedDateprivate Date createdAt;@Field("updated_at")@LastModifiedDateprivate Date updatedAt;@Transientprivate BigDecimal totalAmount;@Document@Datapublic static class OrderItem {private String productId;private String productName;private Integer quantity;private BigDecimal price;}@PostLoadpublic void calculateTotal() {this.totalAmount = items.stream().map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity()))).reduce(BigDecimal.ZERO, BigDecimal::add);}
}

MongoDB Repository

public interface UserRepository extends MongoRepository<UserDocument, String>, CustomUserRepository {Optional<UserDocument> findByUsername(String username);Optional<UserDocument> findByEmail(String email);List<UserDocument> findByFirstNameAndLastName(String firstName, String lastName);@Query("{'address.city': ?0}")List<UserDocument> findByCity(String city);@Query("{'createdAt': {$gte: ?0, $lte: ?1}}")List<UserDocument> findByCreatedAtBetween(Date start, Date end);@Query(value = "{'status': ?0}", fields = "{'username': 1, 'email': 1}")List<UserDocument> findUsersByStatus(String status, Pageable pageable);
}public interface CustomUserRepository {List<UserDocument> findActiveUsers();long updateUserStatus(String userId, String status);List<UserAggregationResult> getUserStatistics();
}public class CustomUserRepositoryImpl implements CustomUserRepository {@Autowiredprivate MongoTemplate mongoTemplate;@Overridepublic List<UserDocument> findActiveUsers() {Criteria criteria = Criteria.where("status").is("ACTIVE").and("lastLoginDate").gte(LocalDateTime.now().minusMonths(1));Query query = new Query(criteria);return mongoTemplate.find(query, UserDocument.class);}@Overridepublic long updateUserStatus(String userId, String status) {Query query = new Query(Criteria.where("id").is(userId));Update update = new Update().set("status", status).set("updatedAt", new Date());UpdateResult result = mongoTemplate.updateFirst(query, update, UserDocument.class);return result.getModifiedCount();}@Overridepublic List<UserAggregationResult> getUserStatistics() {Aggregation aggregation = Aggregation.newAggregation(Aggregation.group("status").count().as("count").avg("loginCount").as("avgLogins").sum("loginCount").as("totalLogins"),Aggregation.sort(Sort.Direction.DESC, "count"));return mongoTemplate.aggregate(aggregation, "users", UserAggregationResult.class).getMappedResults();}
}public interface OrderRepository extends MongoRepository<OrderDocument, String> {List<OrderDocument> findByCustomerIdAndStatus(String customerId, String status);List<OrderDocument> findByCreatedAtBetween(Date start, Date end);@Query(value = "{'amount': {$gt: ?0}}", sort = "{'createdAt': -1}")List<OrderDocument> findOrdersAboveAmount(BigDecimal amount, Pageable pageable);@Query(value = "{}", count = true)long countAllOrders();
}

MongoDB服务实现

@Service
@Slf4j
public class MongoDBService {@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate UserRepository userRepository;@Autowiredprivate OrderRepository orderRepository;@Transactionalpublic UserDocument createUserWithTransaction(UserDocument user) {// 检查用户名是否已存在if (userRepository.findByUsername(user.getUsername()).isPresent()) {throw new BusinessException("用户名已存在");}// 保存用户UserDocument savedUser = userRepository.save(user);// 创建初始订单createWelcomeOrder(savedUser.getId());return savedUser;}@Transactionalpublic void createWelcomeOrder(String userId) {OrderDocument order = OrderDocument.builder().customerId(userId).orderNumber("WELCOME_" + System.currentTimeMillis()).amount(BigDecimal.ZERO).status("COMPLETED").items(Collections.emptyList()).build();orderRepository.save(order);}public List<UserDocument> searchUsers(UserQuery query, Pageable pageable) {Criteria criteria = new Criteria();if (StringUtils.isNotBlank(query.getKeyword())) {criteria.orOperator(Criteria.where("username").regex(query.getKeyword(), "i"),Criteria.where("email").regex(query.getKeyword(), "i"),Criteria.where("firstName").regex(query.getKeyword(), "i"),Criteria.where("lastName").regex(query.getKeyword(), "i"));}if (StringUtils.isNotBlank(query.getCity())) {criteria.and("address.city").is(query.getCity());}if (query.getCreatedStart() != null && query.getCreatedEnd() != null) {criteria.and("createdAt").gte(query.getCreatedStart()).lte(query.getCreatedEnd());}Query mongoQuery = new Query(criteria);mongoQuery.with(pageable);mongoQuery.with(Sort.by(Sort.Direction.DESC, "createdAt"));return mongoTemplate.find(mongoQuery, UserDocument.class);}public List<OrderDocument> getOrdersByCustomer(String customerId, Pageable pageable) {return orderRepository.findByCustomerIdAndStatus(customerId, "ACTIVE", pageable);}public Map<String, Object> getOrderStatistics() {Aggregation aggregation = Aggregation.newAggregation(Aggregation.group("status").count().as("count").sum("amount").as("totalAmount"),Aggregation.sort(Sort.Direction.DESC, "count"));AggregationResults<Map> results = mongoTemplate.aggregate(aggregation, "orders", Map.class);return results.getMappedResults().stream().collect(Collectors.toMap(map -> (String) map.get("_id"),map -> map));}public void createIndexes() {// 创建文本索引mongoTemplate.indexOps(UserDocument.class).ensureIndex(new TextIndexDefinitionBuilder().onField("firstName", 2.0f).onField("lastName", 2.0f).onField("email").build());// 创建TTL索引(自动过期)mongoTemplate.indexOps(OrderDocument.class).ensureIndex(new Index().on("createdAt", Sort.Direction.ASC).expire(30, TimeUnit.DAYS));}public void performBulkOperations() {List<Pair<Query, Update>> updates = new ArrayList<>();// 批量更新示例Query query = new Query(Criteria.where("status").is("PENDING"));Update update = new Update().set("status", "PROCESSING").set("processedAt", new Date());updates.add(Pair.of(query, update));BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, OrderDocument.class);for (Pair<Query, Update> operation : updates) {bulkOps.updateMulti(operation.getFirst(), operation.getSecond());}BulkWriteResult result = bulkOps.execute();log.info("Bulk operation completed: {}", result.getModifiedCount());}
}

MongoDB整合Redis缓存

@Service
@Slf4j
public class CachedUserService {@Autowiredprivate UserRepository userRepository;@Autowiredprivate RedisService redisService;private static final String USER_CACHE_PREFIX = "mongo:user:";private static final long CACHE_TTL = 3600; // 1小时@Cacheable(value = "users", key = "#userId")public UserDocument getUserWithCache(String userId) {String cacheKey = USER_CACHE_PREFIX + userId;// 先查Redis缓存UserDocument user = (UserDocument) redisService.get(cacheKey);if (user != null) {return user;}// 缓存未命中,查询MongoDBuser = userRepository.findById(userId).orElseThrow(() -> new NotFoundException("用户不存在"));// 写入缓存redisService.set(cacheKey, user, CACHE_TTL);return user;}@CacheEvict(value = "users", key = "#userId")public void evictUserCache(String userId) {String cacheKey = USER_CACHE_PREFIX + userId;redisService.delete(cacheKey);}@Caching(evict = {@CacheEvict(value = "users", key = "#user.id"),@CacheEvict(value = "user_stats", allEntries = true)})public UserDocument updateUser(UserDocument user) {return userRepository.save(user);}
}

整合Elasticsearch搜索

@Service
@Slf4j
public class UserSearchService {@Autowiredprivate UserRepository userRepository;@Autowiredprivate ElasticsearchService elasticsearchService;@Asyncpublic void indexUserToElasticsearch(String userId) {try {UserDocument user = userRepository.findById(userId).orElseThrow(() -> new NotFoundException("用户不存在"));elasticsearchService.indexDocument("users", userId, user);log.info("User {} indexed to Elasticsearch", userId);} catch (Exception e) {log.error("Failed to index user {} to Elasticsearch: {}", userId, e.getMessage());}}public List<UserDocument> searchUsersInElasticsearch(String query) {SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();if (StringUtils.isNotBlank(query)) {boolQuery.should(QueryBuilders.matchQuery("username", query).boost(2.0f));boolQuery.should(QueryBuilders.matchQuery("email", query).boost(1.5f));boolQuery.should(QueryBuilders.matchQuery("firstName", query));boolQuery.should(QueryBuilders.matchQuery("lastName", query));}sourceBuilder.query(boolQuery);sourceBuilder.size(50);sourceBuilder.sort("createdAt", SortOrder.DESC);return elasticsearchService.search("users", sourceBuilder, UserDocument.class);}
}

整合RocketMQ消息

@Component
@Slf4j
public class UserMessageListener {@Autowiredprivate MongoDBService mongoDBService;@Autowiredprivate UserSearchService userSearchService;@RocketMQMessageListener(topic = "USER_TOPIC",consumerGroup = "mongo-user-consumer-group")public void handleUserMessage(MessageExt message) {try {String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);UserEvent userEvent = JSON.parseObject(messageBody, UserEvent.class);switch (userEvent.getType()) {case "USER_CREATED":handleUserCreated(userEvent);break;case "USER_UPDATED":handleUserUpdated(userEvent);break;case "USER_DELETED":handleUserDeleted(userEvent);break;default:log.warn("Unknown user event type: {}", userEvent.getType());}} catch (Exception e) {log.error("Failed to process user message: {}", e.getMessage());}}private void handleUserCreated(UserEvent event) {// 异步索引到ElasticsearchuserSearchService.indexUserToElasticsearch(event.getUserId());// 其他业务处理log.info("User created: {}", event.getUserId());}private void handleUserUpdated(UserEvent event) {// 更新缓存和索引userSearchService.indexUserToElasticsearch(event.getUserId());log.info("User updated: {}", event.getUserId());}private void handleUserDeleted(UserEvent event) {// 从Elasticsearch删除索引elasticsearchService.deleteDocument("users", event.getUserId());log.info("User deleted: {}", event.getUserId());}
}

MongoDB单元测试

@SpringBootTest
@DataMongoTest
@ActiveProfiles("test")
@Slf4j
public class MongoDBServiceTest {@Autowiredprivate MongoDBService mongoDBService;@Autowiredprivate UserRepository userRepository;@Autowiredprivate MongoTemplate mongoTemplate;@BeforeEachvoid setUp() {mongoTemplate.dropCollection(UserDocument.class);mongoTemplate.dropCollection(OrderDocument.class);}@Test@Transactionalpublic void testCreateUserWithTransaction() {UserDocument user = UserDocument.builder().username("testuser").email("test@example.com").firstName("Test").lastName("User").build();UserDocument savedUser = mongoDBService.createUserWithTransaction(user);assertNotNull(savedUser.getId());assertEquals("testuser", savedUser.getUsername());// 验证欢迎订单已创建List<OrderDocument> orders = orderRepository.findByCustomerId(savedUser.getId());assertEquals(1, orders.size());assertEquals("COMPLETED", orders.get(0).getStatus());}@Testpublic void testSearchUsers() {// 准备测试数据UserDocument user1 = createTestUser("user1", "user1@example.com", "北京");UserDocument user2 = createTestUser("user2", "user2@example.com", "上海");userRepository.saveAll(Arrays.asList(user1, user2));// 测试搜索UserQuery query = new UserQuery();query.setKeyword("user1");query.setCity("北京");List<UserDocument> results = mongoDBService.searchUsers(query, PageRequest.of(0, 10));assertEquals(1, results.size());assertEquals("user1", results.get(0).getUsername());}@Testpublic void testBulkOperations() {// 准备测试数据List<UserDocument> users = IntStream.range(0, 100).mapToObj(i -> createTestUser("user" + i, "user" + i + "@example.com", "城市" + (i % 5))).collect(Collectors.toList());userRepository.saveAll(users);// 执行批量操作mongoDBService.performBulkOperations();// 验证结果long processedCount = userRepository.countByStatus("PROCESSING");assertTrue(processedCount > 0);}private UserDocument createTestUser(String username, String email, String city) {return UserDocument.builder().username(username).email(email).firstName("Test").lastName("User").address(new UserDocument.Address(city, "Test Street", "China", "100000", new double[]{116.4, 39.9})).build();}
}

集成测试

@SpringBootTest
@ActiveProfiles("integration")
@Slf4j
public class MongoDBIntegrationTest {@Autowiredprivate MongoDBService mongoDBService;@Autowiredprivate RedisService redisService;@Autowiredprivate ElasticsearchService elasticsearchService;@Testpublic void testMongoDBWithRedisCache() {UserDocument user = createTestUser();mongoDBService.createUserWithTransaction(user);// 第一次查询(应该查询数据库)UserDocument result1 = mongoDBService.getUserWithCache(user.getId());assertNotNull(result1);// 第二次查询(应该从缓存获取)UserDocument result2 = mongoDBService.getUserWithCache(user.getId());assertNotNull(result2);// 验证缓存命中String cacheKey = "mongo:user:" + user.getId();assertTrue(redisService.exists(cacheKey));}@Testpublic void testMongoDBWithElasticsearch() throws InterruptedException {UserDocument user = createTestUser();mongoDBService.createUserWithTransaction(user);// 等待异步索引完成Thread.sleep(1000);// 在Elasticsearch中搜索List<UserDocument> results = mongoDBService.searchUsersInElasticsearch(user.getUsername());assertFalse(results.isEmpty());assertEquals(user.getUsername(), results.get(0).getUsername());}private UserDocument createTestUser() {return UserDocument.builder().username("integration_test").email("integration@example.com").firstName("Integration").lastName("Test").build();}
}

性能测试

@SpringBootTest
@ActiveProfiles("performance")
@Slf4j
public class MongoDBPerformanceTest {@Autowiredprivate MongoDBService mongoDBService;@Autowiredprivate UserRepository userRepository;@Testpublic void testWritePerformance() {int batchSize = 1000;long startTime = System.currentTimeMillis();for (int i = 0; i < batchSize; i++) {UserDocument user = UserDocument.builder().username("perfuser" + i).email("perf" + i + "@example.com").firstName("Performance").lastName("Test").build();userRepository.save(user);}long endTime = System.currentTimeMillis();long duration = endTime - startTime;log.info("Inserted {} documents in {} ms, throughput: {}/s", batchSize, duration, (batchSize * 1000) / duration);assertTrue(duration < 5000, "Write performance too slow");}@Testpublic void testReadPerformance() {int queryCount = 1000;long startTime = System.currentTimeMillis();for (int i = 0; i < queryCount; i++) {userRepository.findByUsername("perfuser" + (i % 100));}long endTime = System.currentTimeMillis();long duration = endTime - startTime;log.info("Executed {} queries in {} ms, throughput: {}/s", queryCount, duration, (queryCount * 1000) / duration);assertTrue(duration < 3000, "Read performance too slow");}
}

健康检查和监控

@Component
@Slf4j
public class MongoDBHealthChecker {@Autowiredprivate MongoTemplate mongoTemplate;@Scheduled(fixedRate = 30000)public void checkMongoDBHealth() {try {// 执行简单的ping命令Document result = mongoTemplate.executeCommand("{ ping: 1 }");if (result.getDouble("ok") != 1.0) {log.warn("MongoDB health check failed");}// 检查副本集状态Document replStatus = mongoTemplate.executeCommand("{ replSetGetStatus: 1 }");log.debug("MongoDB replica set status: {}", replStatus.toJson());} catch (Exception e) {log.error("MongoDB health check error: {}", e.getMessage());}}public Map<String, Object> getMongoDBStats() {try {Document dbStats = mongoTemplate.executeCommand("{ dbStats: 1 }");Document serverStatus = mongoTemplate.executeCommand("{ serverStatus: 1 }");Map<String, Object> stats = new HashMap<>();stats.put("dbStats", dbStats);stats.put("serverStatus", serverStatus);return stats;} catch (Exception e) {log.error("Failed to get MongoDB stats: {}", e.getMessage());return Collections.emptyMap();}}
}
http://www.xdnf.cn/news/1412461.html

相关文章:

  • 决策思维研究体系主要构成
  • Python入门教程之类型判别
  • STM32F103C8T6的智能医疗药品存储柜系统设计与华为云实现
  • 解决git push时的错误提示:“error: src refspec master does not match any”
  • 漏洞基础与文件包含漏洞原理级分析
  • 【重学MySQL】九十四、MySQL请求到响应过程中字符集的变化
  • 盛最多水的容器:双指针法的巧妙运用(leetcode 11)
  • 多智能体系统设计:5种编排模式解决复杂AI任务
  • FPGA设计杂谈之七:异步复位为何是Recovery/Removal分析?
  • FunASR人工智能语音转写服务本地部署测试
  • HTTPS -> HTTP 引起的 307 状态码与HSTS
  • C++动态规划——经典题目(下)
  • Chrome DevTools Performance 是优化前端性能的瑞士军刀
  • JSP 原理深度解析
  • MATLAB R2010b系统环境(四)MATLAB帮助系统
  • 【GPT入门】第62课 情感对话场景模型选型、训练与评测方法,整体架构设计
  • 深度学习篇---MobileNet网络结构
  • 五分钟聊一聊AQS源码
  • globals() 小技巧
  • 仅有一张Fig的8分文章 胞外囊泡lncRNA+ CT 多模态融合模型,AUC 最高达 94.8%
  • 【LeetCode修行之路】算法的时间和空间复杂度分析
  • 大数据毕业设计选题推荐-基于大数据的大气和海洋动力学数据分析与可视化系统-Spark-Hadoop-Bigdata
  • ESP32C3 系列实战(1) --点亮小灯
  • Wi-Fi技术——物理层技术
  • 使用Cadence工具完成数模混合设计流程简介
  • LangChain核心抽象:Runnable接口深度解析
  • leetcode_48 旋转图像
  • FFMPEG学习任务
  • 第 14 篇:K-Means与聚类思维——当AI在没有“标准答案”的世界里寻宝
  • 【C2000】C2000的硬件设计指导与几点意见