rocketmq 之 DLeger集群,启动ACL 1.0,集成rocektmq-mqtt实践
背景
公司有个项目,开始的时候使用的阿里云全家桶,后面因为节约成本,所有组件本地化。所以,这里记录下我的探索历程 (以前没咋玩过rocketmq,一般都是rabbitmq)
开始打开github,下载最新版 rocektmq-5.3.3,rocektmq-dashboard-2.0.0,rocketmq-mqtt-1.0.2集群部署,mqtt集成,测试都OK了,很开心。后面开启权限校验,怎么整都不行,后来才知道,5.x后引入ACL 2.0,改进了ACL 1.0的安全问题(想知道就点这里看有啥不同)。于是,就按照这里配置了权限,好嘛,命令行没啥问题,rocektmq-dashboard权限都不校验了,使用客户端测试,也不校验权限,mqtt连接,OK,发送消息,mqtt到rocektmq错误,看不懂啥原因,就是拿着源码看都搞不懂啥问题。
TMD
,浪费几天时间。 最后,老实使用老版本。(如果有谁使用上述软件版本都成功了,麻烦留言,交流下)
环境
linux 64bit
jdk 1.8
rocketmq-4.9.8
rocketmq-dashboard-1.0.0
rocketmq-mqtt-1.0.1
要求
服务器连接rocektmq,客户端连接mqtt,二者完成消息的订阅/发布。
服务器地址列表
- 10.18.6.94
- 10.18.6.95
- 10.18.6.96
rocketmq部署
下载
wget https://dist.apache.org/repos/dist/release/rocketmq/4.9.8/rocketmq-all-4.9.8-bin-release.zip
设置环境
增加环境变量,rocektmq和mqtt启动需要。
# vi .bash_profile
export ROCKETMQ_HOME=/home/test/rocketmq-all-4.9.8-bin-release
export PATH=$ROCKETMQ_HOME/bin:$PATH
export JAVA_HOME=/usr/lib/jvm/jdk-1.8-oracle-x64
重新加载
source .bash_profile
配置
修改conf/dledger
目录的三个配置文件 broker-n0.conf
broker-n1.conf
broker-n2.conf
#cat broker-n0.confbrokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
# 开启ACL
aclEnable=true
# 还是指定下权限文件,之后在conf目录下再建个acl目录,原因自己gpt
aclConfigFile=/home/test/rocketmq-all-4.9.8-bin-release/conf/plain_acl.yml
# 开启mqtt
enableLmq=true
enableMultiDispatch=true
只对三个文件最后追加了如下内容:
# 开启ACL
aclEnable=true
# 还是指定下权限文件,之后在conf目录下再建个acl目录,原因自己gpt
aclConfigFile=/home/test/rocketmq-all-4.9.8-bin-release/conf/plain_acl.yml
# 开启mqtt
enableLmq=true
enableMultiDispatch=true
JVM参数修改(可选)
默认JVM配置太大,我修改小了。
vi bin/dledger/fast-try.sh
startNameserver() {export JAVA_OPT_EXT=" -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=90m -XX:MaxMetaspaceSize=180m "nohup bin/mqnamesrv &
}
startBroker() {export JAVA_OPT_EXT=" -Xms256m -Xmx256m "conf_name=$1nohup bin/mqbroker -c $conf_name &
}
启动
每个文件修改完之后,启动集群就好(可以看下启动的脚本)。
sh bin/dledger/fast-try.sh start
查看
mqadmin clusterList -n 127.0.0.1:9876
结果:
dashboard部署
下载
wget https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip
编译
解压进入目录后,执行如下命令编译项目
mvn clean package -Dmaven.test.skip=true
最后创建一个dashboard的目录,把源码resources目录下的application.properties
拿出来和jar包放到一起,修改配置文件启动就好。
参数文件配置
cat application.properties
server.address=0.0.0.0
server.port=18080### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey#spring.application.index=true
spring.application.name=rocketmq-dashboard
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.level.root=INFO
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=10.18.6.94:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
#rocketmq.config.isVIPChannel=
#timeout for mqadminExt, default 5000ms
#rocketmq.config.timeoutMillis=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=true#set the accessKey and secretKey if you used acl
rocketmq.config.accessKey=RocketMQsdfadf
rocketmq.config.secretKey=12345678
rocketmq.config.useTLS=false
上面我改了验证相关信息,登录信息。
启动命令
nohup java -jar rocketmq-dashboard-1.0.0.jar &
结果
http://10.18.6.96:18080/#/login
账号:admin/admin
账号信息在resources目录下有个users.properties
文件,可以配置
rocektmq-mqtt部署
下载并编译
https://github.com/apache/rocketmq-mqtt/archive/refs/tags/rocketmq-mqtt-1.0.1.zip
mvn -Prelease-all -DskipTests clean install -U
路径rocketmq-mqtt-rocketmq-mqtt-1.0.1\distribution\target
下有 rocketmq-mqtt-1.0.1.zip
,解压即可使用
三个服务器都要操作,这里之示范一个即可
配置
cat meta.conf
selfAddress=10.18.6.94:10000
membersAddress=10.18.6.94:10000,10.18.6.95:10000,10.18.6.96:10000
cat service.conf
username=LTAI5tDKjs3fbeMy3gCRv1Ye
secretKey=3Lbuqu5gooZLhZeNfaKHVD73vsqxzfNAMESRV_ADDR=10.18.6.94:9876
eventNotifyRetryTopic=%RETRY%RMQ_SYS_EVENT_NOTIFY
clientRetryTopic=%RETRY%RMQ_MQTT_CLIENTmetaAddr=10.18.6.94:10000,10.18.6.95:10000,10.18.6.96:10000
说明:
每个服务器的service.conf
配置都一样,meta.conf
文件中只有selfAddress
时自己的IP。
rocektmq的其他配置
在有rocektmq那台服务器执行如下命令(原因点击这里查看):
sh mqadmin updatetopic -c RaftCluster -t topicB -n -n 10.18.6.94:9876
sh mqadmin updateKvConfig -s LMQ -k LMQ_CONNECT_NODES -v 10.18.6.94 -n 10.18.6.94:9876
sh mqadmin updateKvConfig -s LMQ -k ALL_FIRST_TOPICS -v topicB -n 10.18.6.94:9876
sh mqadmin updateKvConfig -s LMQ -k topicB -v topicB/+ -n 10.18.6.94:9876
启动mqtt
cd bin
sh meta.sh start
sh mqtt.sh start
测试
用到的用户信息,可以去rocektmq配置文件目录下的plain_acl.yml
查看
rocektmq生产者
package org.apache.rocketmq.mqtt.example;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;public class RocketMQProducer {private static DefaultMQProducer producer;private static String firstTopic = "topicB";private static String recvClientId = "recv01";public static void main(String[] args) throws Exception {// 替换为你的 accessKey 和 secretKeyString accessKey = "RocketMQ";String secretKey = "12345678";// 创建 ACL HookAclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));//Instantiate with a producer group name.producer = new DefaultMQProducer(null,"PID_TEST", aclHook);
// producer = new DefaultMQProducer("PID_TEST");// Specify name server addresses.producer.setNamesrvAddr("10.18.6.94:9876");//Launch the instance.producer.start();for (int i = 0; i < 2; i++) {//Create a message instance, specifying topic, tag and message body.//Call send message to deliver message to one of brokers.try {sendMessage(i);Thread.sleep(1000);sendWithWildcardMessage(i);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}//Shut down once the producer instance is not longer in use.producer.shutdown();}private static void setLmq(Message msg, Set<String> queues) {msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,StringUtils.join(queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));}private static void sendMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {Message msg = new Message(firstTopic,"MQ2MQTT",("MQ_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));String secondTopic = "/r1";setLmq(msg, new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, secondTopic))));SendResult sendResult = producer.send(msg);System.out.println(now() + "sendMessage: " + new String(msg.getBody()));}private static void sendWithWildcardMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {Message msg = new Message(firstTopic,"MQ2MQTT",("MQwc_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));String secondTopic = "/r/wc";Set<String> lmqSet = new HashSet<>();lmqSet.add(TopicUtils.wrapLmq(firstTopic, secondTopic));lmqSet.addAll(mapWildCardLmq(firstTopic, secondTopic));setLmq(msg, lmqSet);SendResult sendResult = producer.send(msg);System.out.println(now() + "sendWcMessage: " + new String(msg.getBody()));}private static Set<String> mapWildCardLmq(String firstTopic, String secondTopic) {// todo by yourselfreturn new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, "/r/+")));}private static String now() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");return sf.format(new Date()) + "\t";}}
mqtt消费者
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.rocketmq.mqtt.example;import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;public class MqttConsumer {public static void main(String[] args) throws MqttException, NoSuchAlgorithmException, InvalidKeyException {String brokerUrl = "tcp://10.18.6.94:1883";String firstTopic = "topicB";MemoryPersistence memoryPersistence = new MemoryPersistence();String recvClientId = "recv01";MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(recvClientId);MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);mqttClient.setTimeToWait(5000L);mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println(recvClientId + " connect success to " + serverURI);try {final String topicFilter[] = {firstTopic + "/r1", firstTopic + "/r/+", firstTopic + "/r2"};final int[] qos = {1, 1, 2};mqttClient.subscribe(topicFilter, qos);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {try {String payload = new String(mqttMessage.getPayload());String[] ss = payload.split("_");System.out.println(now() + "receive:" + topic + "," + payload);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});try {mqttClient.connect(mqttConnectOptions);} catch (Exception e) {e.printStackTrace();System.out.println("connect fail");}}private static MqttConnectOptions buildMqttConnectOptions(String clientId) throws NoSuchAlgorithmException, InvalidKeyException {MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);connOpts.setKeepAliveInterval(60);connOpts.setAutomaticReconnect(true);connOpts.setMaxInflight(10000);connOpts.setUserName("LTAI5tDKjs3fbeMy3gCRv1Ye");connOpts.setPassword(HmacSHA1Util.macSignature(clientId, "3Lbuqu5gooZLhZeNfaKHVD73vsqxzf").toCharArray());return connOpts;}private static String now() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");return sf.format(new Date()) + "\t";}
}
总结
至此,已经按照需求部署完成,测试通过。后面会写一写阿里云项目改本地部署rocektmq的一些坑。
参考文章
https://github.com/apache/rocketmq-mqtt/tree/rocketmq-mqtt-1.0.1
https://github.com/apache/rocketmq
https://github.com/apache/rocketmq-dashboard/tree/rocketmq-dashboard-1.0.0
https://rocketmq.apache.org/zh/docs/quickStart/01quickstart
https://rocketmq-learning.com/faq/ons-user-question-history16379/?source=faq