消息中间件(RocketMQ+RabbitMQ+Kafka)
消息中间件适用场景
RocketMQ应用 - 电商订单系统
订单创建、支付、发货全流程
①、环境准备和部署
# 下载RocketMQ
wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-4.9.4/# 启动NameServer
nohup sh bin/mqnamesrv &# 修改Broker配置(conf/broker.conf)
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
#控制台安装
docker pull apacherocketmq/rocketmq-console:2.0.0
docker run -d --name rocketmq-console \-e "JAVA_OPTS=-Drocketmq.namesrv.addr=localhost:9876" \-p 8080:8080 \apacherocketmq/rocketmq-console:2.0.0
②、订单生产者实现:
- DefaultMQProducer producer = new DefaultMQProducer(“order_producer_group”);
- Message createMsg = buildOrderMessage(order, ORDER_CREATE_TAG);
- SendResult createResult = producer.send(createMsg);
public class OrderProducer{private static final String ORDER_TOPIC = "order_topic";private static final String ORDER_CREATE_TAG = "create";private static final String ORDER_PAY_TAG = "pay";private static final String ORDER_SHIP_TAG = "ship";public static void main(String[] args) throws Exception {//1.创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");producer.setNamesrvAddr("localhost:9876");producer.setRetryTimesWhenSendFailed(3); // 发送失败重试次数producer.start();// 2. 模拟订单创建Order order = createOrder();Message createMsg = buildOrderMessage(order, ORDER_CREATE_TAG);SendResult createResult = producer.send(createMsg);System.out.println("订单创建消息发送结果: " + createResult);// 3. 模拟订单支付(5秒后)TimeUnit.SECONDS.sleep(5);order.setStatus("PAID");Message payMsg = buildOrderMessage(order, ORDER_PAY_TAG);SendResult payResult = producer.send(payMsg);System.out.println("订单支付消息发送结果: " + payResult);// 4. 模拟订单发货(10秒后)TimeUnit.SECONDS.sleep(10);order.setStatus("SHIPPED");Message shipMsg = buildOrderMessage(order, ORDER_SHIP_TAG);SendResult shipResult = producer.send(shipMSG,new MessageQueueSelector(){@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg){// 确保同一订单的消息进入同一队列Long orderId = (Long) arg;return mqs.get((int) (orderId % mqs.size()));}},order.getOrderId());System.out.println("订单发货消息发送结果: " + shipResult);producer.shutdown();}private static Message buildOrderMessage(Order order, String tag) throws Exception {return new Message(ORDER_TOPIC,tag,order.getOrderId().toString(),JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));}private static Order createOrder() {Order order = new Order();order.setOrderId(System.currentTimeMillis());order.setUserId(10001L);order.setAmount(new BigDecimal("599.99"));order.setStatus("CREATED");order.setCreateTime(new Date());return order;}
}
③、消费者
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“order_consumer_group”);
- consumer.subscribe(ORDER_TOPIC, “create || pay || ship”);
public class OrderConsumer{private static final String ORDER_TOPIC = "order_topic";public static void main(String[] args) throws Exception {// 1. 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeThreadMin(5);consumer.setConsumeThreadMax(10);consumer.setConsumeMessageBatchMaxSize(10); // 批量消费// 2. 订阅主题和标签consumer.subscribe(ORDER_TOPIC, "create || pay || ship");// 3.注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly(){//使用MessageListenerOrderly保证分区有序@Overridepublic ConsumerOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){for (MessageExt msg : msgs) {try {Order order = JSON.parseObject(new String(msg.getBody()), Order.class);System.out.printf("收到订单消息: 订单ID=%d 状态=%s 标签=%s%n",order.getOrderId(), order.getStatus(), msg.getTags());// 根据消息标签处理不同业务switch (msg.getTags()) {case "create":handleOrderCreate(order);break;case "pay":handleOrderPay(order);break;case "ship":handleOrderShip(order);break;}} catch (Exception e) {System.err.println("处理订单消息失败: " + e.getMessage());// 当前消息处理失败,暂停当前队列return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("订单消费者已启动");}private static void handleOrderCreate(Order order) {// 订单创建业务逻辑System.out.println("处理订单创建: " + order.getOrderId());}private static void handleOrderPay(Order order) {// 订单支付业务逻辑System.out.println("处理订单支付: " + order.getOrderId());}private static void handleOrderShip(Order order) {// 订单发货业务逻辑System.out.println("处理订单发货: " + order.getOrderId());}
}
顺序消息保障:
- 使用MessageListenerOrderly保证分区有序
- 相同订单ID的消息发送到同一队列
- 消费失败时暂停当前队列
消息可靠性保障:
// 生产者配置
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步发送失败重试
producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试// Broker配置
flushDiskType=SYNC_FLUSH // 同步刷盘(更高可靠性)
brokerRole=SYNC_MASTER // 同步复制(更高可靠性)
消息积压处理方案:
// 消费者配置
consumer.setConsumeThreadMax(20); // 增加消费线程
consumer.setPullBatchSize(32); // 每次拉取消息数
consumer.setConsumeMessageBatchMaxSize(10); // 批量消费数量// 临时解决方案
// 1. 增加消费者实例
// 2. 创建新的消费者组从头消费
// 3. 跳过非关键消息
RabbitMQ应用 - 物流通知系统
物流状态变更通知
①、环境准备与部署
docker run -d --name rabbitmq \-p 5672:5672 -p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin123 \rabbitmq:3.9-management
②、物流状态生产者
- ConnectionFactory factory = new ConnectionFactory();
- Connection connection = factory.newConnection()
- Channel channel = connection.createChannel()
- channel.basicPublish()
public class LogisticsProducer {private static final String EXCHANGE_NAME = "logistics.direct";private static final String[] ROUTING_KEYS = {"logistics.created", "logistics.shipped", "logistics.delivered"};public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin123");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){//声明持久化直连交换机channel.exchangeDeclare(EXCHANGE,"");// 模拟物流状态变更for (int i = 1; i <= 10; i++) {Logistics logistics = new Logistics("LOG" + System.currentTimeMillis(),"ORDER" + i,"CREATED",new Date());// 初始状态publishLogisticsStatus(channel, logistics, "logistics.created");// 5秒后变更为已发货TimeUnit.SECONDS.sleep(5);logistics.setStatus("SHIPPED");publishLogisticsStatus(channel, logistics, "logistics.shipped");// 10秒后变更为已送达TimeUnit.SECONDS.sleep(10);logistics.setStatus("DELIVERED");publishLogisticsStatus(channel, logistics, "logistics.delivered");}}}private static void publishLogisticsStatus(Channel channel, Logistics logistics, String routingKey) throws Exception {String message = JSON.toJSONString(logistics);channel.basicPublish(EXCHANGE_NAME,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化消息message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");}
}
③、消费者
- List item
public class LogisticsConsumer {private static final String EXCHANGE_NAME = "logistics.direct";private static final String QUEUE_NAME = "logistics.notification.queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机和队列channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定路由键channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "logistics.created");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "logistics.shipped");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "logistics.delivered");System.out.println(" [*] Waiting for logistics messages. To exit press CTRL+C");// 每次预取10条消息channel.basicQos(10);DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {String message = new String(delivery.getBody(), "UTF-8");Logistics logistics = JSON.parseObject(message, Logistics.class);System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + logistics + "'");// 根据路由键处理不同状态switch (delivery.getEnvelope().getRoutingKey()) {case "logistics.created":handleCreated(logistics);break;case "logistics.shipped":handleShipped(logistics);break;case "logistics.delivered":handleDelivered(logistics);break;}// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {System.err.println(" [x] Error processing message: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}private static void handleCreated(Logistics logistics) {System.out.println("处理物流创建: 发送创建通知短信/邮件");// 实际业务逻辑...}private static void handleShipped(Logistics logistics) {System.out.println("处理物流发货: 更新订单状态为已发货");// 实际业务逻辑...}private static void handleDelivered(Logistics logistics) {System.out.println("处理物流送达: 完成订单结算");// 实际业务逻辑...}
}
消息可靠性保障
// 生产者配置
channel.confirmSelect(); // 开启发布确认
channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) {System.out.println("消息已确认: " + deliveryTag);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) {System.err.println("消息未确认: " + deliveryTag);// 重发逻辑...}
});// 消息持久化
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props, message.getBytes());
消息积压处理
// 消费者配置
channel.basicQos(20); // 增加预取数量// 解决方案:
// 1. 增加消费者实例
// 2. 创建死信队列处理失败消息
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "logistics.dlx");
args.put("x-dead-letter-routing-key", "logistics.dead");
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
顺序消息处理
// 单一消费者保证顺序
// 或者使用消息分组ID
Map<String, Object> args = new HashMap<>();
args.put("x-message-group-id", "logistics_group");
channel.queueDeclare("ordered.queue", true, false, false, args);
Kafka应用 - 用户行为分析系统
用户点击、浏览、购买行为收集与分析
①、部署与安装
# 启动Zookeeper
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.6# 启动Kafka
docker run -d --name kafka -p 9092:9092 \--link zookeeper \-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \confluentinc/cp-kafka:6.2.0# 创建主题
docker exec kafka kafka-topics --create \--topic user_behavior \--partitions 3 \--replication-factor 1 \--zookeeper zookeeper:2181
②、生产者
- Producer<String, String> producer = new KafkaProducer<>(props)
- producer.send()
public class UserBehaviorProducer {private static final String TOPIC = "user_behavior";private static final String[] BEHAVIOR_TYPES = {"click", "view", "purchase"};public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all"); // 确保消息被所有副本接收props.put("retries", 3); // 发送失败重试次数props.put("batch.size", 16384); // 批量发送大小props.put("linger.ms", 1); // 发送延迟props.put("buffer.memory", 33554432); // 缓冲区大小props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 模拟1000条用户行为for (int i = 0; i < 1000; i++) {String userId = "user_" + (i % 100); // 100个用户String behaviorType = BEHAVIOR_TYPES[(int) (Math.random() * BEHAVIOR_TYPES.length)];String itemId = "item_" + (int) (Math.random() * 50); // 50个商品UserBehavior behavior = new UserBehavior(userId,behaviorType,itemId,new Date(),System.currentTimeMillis());// 使用用户ID作为key,确保同一用户的消息进入同一分区ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, userId,JSON.toJSONString(behavior));producer.send(record, (metadata, exception) -> {if (exception != null) {System.err.println("发送消息失败: " + exception.getMessage());} else {System.out.printf("发送消息成功: topic=%s, partition=%d, offset=%d%n",metadata.topic(), metadata.partition(), metadata.offset());}});// 随机延迟Thread.sleep((long) (Math.random() * 100));}} catch (Exception e) {e.printStackTrace();}}
}
③、消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
public class UserBehaviorConsumer {private static final String TOPIC = "user_behavior";private static final String GROUP_ID = "user_behavior_group";public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", GROUP_ID);props.put("enable.auto.commit", "false"); // 手动提交offsetprops.put("auto.offset.reset", "earliest"); // 从最早开始消费props.put("max.poll.records", "100"); // 每次poll最大记录数props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 按分区处理保证顺序for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {try {UserBehavior behavior = JSON.parseObject(record.value(), UserBehavior.class);System.out.printf("收到用户行为: partition=%d, offset=%d, key=%s, value=%s%n",record.partition(), record.offset(), record.key(), behavior);// 处理用户行为processBehavior(behavior);} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());// 错误处理逻辑...}}// 按分区提交offsetlong lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}}private static void processBehavior(UserBehavior behavior) {// 实际业务处理逻辑switch (behavior.getBehaviorType()) {case "click":System.out.println("处理点击行为: " + behavior);break;case "view":System.out.println("处理浏览行为: " + behavior);break;case "purchase":System.out.println("处理购买行为: " + behavior);break;}}
}
高吞吐量配置
// 生产者配置
props.put("batch.size", 32768); // 增大批量大小
props.put("linger.ms", 10); // 适当增加延迟
props.put("compression.type", "snappy"); // 启用压缩// 消费者配置
props.put("fetch.min.bytes", 1024); // 最小抓取字节
props.put("fetch.max.wait.ms", 500); // 最大等待时间
消息可靠性保障
// 生产者配置
props.put("acks", "all"); // 所有副本确认
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 1); // 保证顺序// 消费者配置
props.put("enable.auto.commit", "false"); // 手动提交
props.put("auto.offset.reset", "earliest"); // 从最早开始
消息积压处理
// 解决方案:
// 1. 增加分区数
// 2. 增加消费者实例
// 3. 提高消费者并行度// 消费者配置优化
props.put("max.poll.records", 500); // 增加每次poll数量
props.put("max.partition.fetch.bytes", 1048576); // 增加分区fetch大小