当前位置: 首页 > ai >正文

从 JMS 到 ActiveMQ:API 设计与扩展机制分析(三)

三、ActiveMQ API 设计解析

(一)对 JMS API 的实现与扩展

ActiveMQ 作为 JMS 规范的一种实现,全面且深入地实现了 JMS API,确保了其在 Java 消息服务领域的兼容性和通用性。在核心接口实现方面,ActiveMQ 对 JMS 的 ConnectionFactory、Connection、Session 等接口提供了具体的实现类。例如,ActiveMQConnectionFactory 是 ActiveMQ 对 ConnectionFactory 接口的实现,通过它可以创建与 ActiveMQ 服务器的连接,并且在创建过程中可以设置各种连接参数,如连接地址、用户名、密码等,以满足不同的连接需求 。

ActiveMQConnection 则是对 Connection 接口的实现,它负责管理与 ActiveMQ 服务器的物理连接,包括连接的建立、启动、停止等操作。在这个连接之上创建的 ActiveMQSession,实现了 Session 接口,用于提供一个单线程的上下文环境,在这个环境中可以进行消息的发送、接收、创建等操作,并且可以管理事务和消息确认模式 。

除了实现 JMS 的标准接口,ActiveMQ 还扩展了一些特有的接口和功能,以增强其消息处理能力和灵活性。ActiveMQ 提供了 ActiveMQDestination 接口,它是对 JMS 的 Destination 接口的扩展,增加了一些 ActiveMQ 特有的属性和方法,用于更细粒度地控制消息的目的地。在消息发送方面,ActiveMQ 的 MessageProducer 扩展了 JMS 的 MessageProducer 接口,提供了一些额外的发送方法,如 send (Message message, DeliveryMode deliveryMode, int priority, long timeToLive),通过这个方法可以更灵活地设置消息的投递模式、优先级和过期时间等参数 。

通过代码示例可以更直观地展示使用 ActiveMQ 特有 API 实现消息操作的过程。以下是一个使用 ActiveMQConnectionFactory 创建连接并发送消息的示例:

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProducer {

public static void main(String[] args) throws Exception {

// 创建ActiveMQConnectionFactory

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// 创建Connection

Connection connection = factory.createConnection();

connection.start();

// 创建Session

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建Queue

Queue queue = session.createQueue("activemqQueue");

// 创建MessageProducer

MessageProducer producer = session.createProducer(queue);

// 创建TextMessage

TextMessage message = session.createTextMessage("This is an ActiveMQ message");

// 使用ActiveMQ特有方法设置消息属性

producer.send(message, javax.jms.DeliveryMode.PERSISTENT, 4, 10000);

System.out.println("Sent message: " + message.getText());

producer.close();

session.close();

connection.close();

}

}

在上述代码中,首先创建了 ActiveMQConnectionFactory 对象,用于创建与 ActiveMQ 服务器的连接。然后通过该工厂创建了 Connection 和 Session 对象,接着创建了 Queue 和 MessageProducer。在发送消息时,使用了 ActiveMQ 扩展的 send 方法,设置了消息的投递模式为持久化、优先级为 4、过期时间为 10000 毫秒。这样就展示了如何使用 ActiveMQ 特有的 API 进行消息的发送操作 。

(二)ActiveMQ 独有的 API 特性

  1. 消息持久化:ActiveMQ 提供了强大的消息持久化功能,确保在系统故障或重启的情况下,消息不会丢失。它支持多种持久化存储方式,如 KahaDB、LevelDB、JDBC 等。通过 ActiveMQ 的 API,可以方便地配置消息的持久化。在创建 MessageProducer 时,可以设置消息的投递模式为 DeliveryMode.PERSISTENT,这样发送的消息就会被持久化存储。以下是一个设置消息持久化的代码示例:
 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class PersistentMessageProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue("persistentQueue");

MessageProducer producer = session.createProducer(queue);

// 设置消息为持久化

producer.setDeliveryMode(javax.jms.DeliveryMode.PERSISTENT);

TextMessage message = session.createTextMessage("This is a persistent message");

producer.send(message);

System.out.println("Sent persistent message: " + message.getText());

producer.close();

session.close();

connection.close();

}

}

在实际应用中,消息持久化在订单处理系统中尤为重要。当用户下单后,订单消息被发送到 ActiveMQ,通过设置消息持久化,即使在订单处理过程中系统出现故障,订单消息也不会丢失,确保了订单处理的完整性和可靠性 。

2. 事务处理:ActiveMQ 的 API 支持事务处理,通过 Session 接口可以方便地管理事务。在事务模式下,一系列的消息操作被视为一个原子操作,要么全部成功,要么全部失败。如果事务中的某个消息操作失败,可以回滚整个事务,确保数据的一致性。以下是一个使用事务处理的代码示例:

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TransactionExample {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

// 创建事务性Session

Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

Queue queue = session.createQueue("transactionQueue");

MessageProducer producer = session.createProducer(queue);

TextMessage message1 = session.createTextMessage("Message 1 in transaction");

TextMessage message2 = session.createTextMessage("Message 2 in transaction");

producer.send(message1);

producer.send(message2);

// 提交事务

session.commit();

System.out.println("Messages sent within transaction");

producer.close();

session.close();

connection.close();

}

}

在一个电商系统中,当用户进行购物结算时,可能涉及多个消息操作,如更新库存消息、发送订单消息、通知物流消息等。通过将这些消息操作放在一个事务中,可以确保在整个购物结算过程中,要么所有的消息都被成功发送和处理,要么都不被处理,避免出现部分操作成功、部分操作失败的不一致情况 。

3. 集群配置:ActiveMQ 支持集群部署,以提高系统的可用性和性能。通过 ActiveMQ 的集群 API,可以方便地配置和管理集群。在集群环境下,多个 ActiveMQ Broker 可以协同工作,实现负载均衡和故障转移。例如,可以使用 Master-Slave 模式或 Replicated LevelDB Store 模式来构建集群。以下是一个简单的集群配置示例(以 Master-Slave 模式为例):

在 ActiveMQ 的配置文件 activemq.xml 中进行如下配置:

 

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="masterBroker" dataDirectory="${activemq.data}">

<transportConnectors>

<transportConnector name="openwire" uri="tcp://localhost:61616"/>

</transportConnectors>

<replicas>

<replica uri="tcp://slaveBroker:61617"/>

</replicas>

</broker>

在 Slave Broker 的配置文件中:

 

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="slaveBroker" dataDirectory="${activemq.data}">

<transportConnectors>

<transportConnector name="openwire" uri="tcp://slaveBroker:61617"/>

</transportConnectors>

<masterConnector uri="tcp://masterBroker:61616"/>

</broker>

在应用程序中,连接到集群时可以使用 failover 协议:

 

ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://slaveBroker:61617)");

在大型电商系统的促销活动中,高并发的订单消息处理对系统的性能和可用性提出了极高的要求。通过 ActiveMQ 的集群配置,可以将订单消息负载均衡到多个 Broker 上进行处理,当某个 Broker 出现故障时,其他 Broker 可以自动接管,确保订单消息的可靠处理,提高系统的稳定性和吞吐量 。

(三)与 JMS API 的对比分析

  1. 接口设计对比:JMS API 作为 Java 消息服务的规范,定义了一套通用的接口和类,旨在实现与具体消息中间件实现无关的编程,具有高度的抽象性和通用性。它提供了统一的方式来创建连接、会话,发送和接收消息,使得开发者可以编写可移植的消息处理代码,能够在不同的 JMS Provider 之间切换而无需大幅修改代码 。

而 ActiveMQ API 在实现 JMS API 的基础上,进行了扩展和优化。在接口设计上,ActiveMQ 增加了一些特有的接口和方法,以满足其自身的功能需求和特性。ActiveMQConnectionFactory 相比于 JMS 的 ConnectionFactory,提供了更多的配置选项和功能,如可以设置自定义的 WireFormat、TransportFactory 等,以实现更灵活的连接配置 。在消息发送方面,ActiveMQ 的 MessageProducer 扩展了 JMS 的 MessageProducer 接口,提供了更多的发送方法重载,允许更精细地控制消息的投递参数 。

  1. 功能实现对比:在功能实现上,JMS API 定义了基本的消息通信功能,包括消息的发送、接收、消息模型(点对点和发布 / 订阅)、消息确认模式等。它为开发者提供了一个基础的消息处理框架,但对于一些高级功能,如消息持久化的具体实现方式、集群配置等,JMS API 只是提供了抽象的概念,具体的实现留给了 JMS Provider 。

ActiveMQ 在实现 JMS 基本功能的基础上,提供了丰富的高级功能。在消息持久化方面,ActiveMQ 支持多种持久化存储方式,如 KahaDB、LevelDB、JDBC 等,开发者可以根据实际需求选择合适的持久化方案。而 JMS API 本身并没有规定具体的持久化实现方式,不同的 JMS Provider 可能有不同的实现 。在集群配置方面,ActiveMQ 提供了完善的集群解决方案,支持 Master-Slave 模式、Replicated LevelDB Store 模式等多种集群模式,能够满足不同场景下的高可用性和高性能需求 。相比之下,JMS API 没有对集群配置进行详细的规范,ActiveMQ 的集群功能是其特有的扩展 。

  1. 选择建议:在开发过程中,选择 JMS API 还是 ActiveMQ API,需要根据具体的需求来决定。如果项目追求高度的可移植性,希望能够在不同的 JMS Provider 之间轻松切换,并且只需要使用基本的消息通信功能,那么使用 JMS API 是一个不错的选择。例如,一个跨平台的分布式系统,可能会在不同的环境中使用不同的 JMS Provider,此时使用 JMS API 可以确保代码的通用性和可移植性 。

如果项目使用的是 ActiveMQ 作为消息中间件,并且需要利用 ActiveMQ 特有的功能,如特定的消息持久化方式、集群配置、更灵活的消息发送控制等,那么使用 ActiveMQ API 可以更好地发挥 ActiveMQ 的优势,提高系统的性能和可靠性。在一个对消息可靠性要求极高的金融系统中,使用 ActiveMQ 的消息持久化和事务处理功能,可以确保金融交易消息的准确和完整;在一个高并发的电商系统中,利用 ActiveMQ 的集群配置功能,可以提高系统的吞吐量和可用性 。

四、ActiveMQ 扩展机制探秘

(一)插件扩展机制

ActiveMQ 的插件扩展机制是其强大扩展性的重要体现,它允许开发者通过编写自定义插件来增强 ActiveMQ 的功能。该机制的原理基于责任链模式,每个插件都可以对消息的处理过程进行拦截和修改。当消息在 ActiveMQ 中传递时,会依次经过各个插件,每个插件可以根据自身的逻辑对消息进行处理,如验证消息的合法性、修改消息的属性、记录消息的日志等 。

在实现方式上,开发者需要实现 BrokerPlugin 接口,该接口定义了 installPlugin 方法,用于将插件安装到 Broker 中。在安装过程中,插件会获取到 Broker 的引用,从而可以对 Broker 的行为进行扩展。以下是一个简单的自定义插件示例,用于记录消息的发送时间:

 

import org.apache.activemq.broker.Broker;

import org.apache.activemq.broker.BrokerPlugin;

import org.apache.activemq.broker.ConnectionContext;

import org.apache.activemq.command.Message;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MessageSendTimePlugin implements BrokerPlugin {

private static final Logger LOGGER = LoggerFactory.getLogger(MessageSendTimePlugin.class);

@Override

public Broker installPlugin(Broker broker) throws Exception {

return new MessageSendTimeFilter(broker);

}

private static class MessageSendTimeFilter extends BrokerFilter {

public MessageSendTimeFilter(Broker next) {

super(next);

}

@Override

public void send(ConnectionContext context, Message message) throws Exception {

message.setLongProperty("sendTime", System.currentTimeMillis());

LOGGER.info("Message send time set: {}", message.getLongProperty("sendTime"));

super.send(context, message);

}

}

}

在上述代码中,MessageSendTimePlugin 实现了 BrokerPlugin 接口,在 installPlugin 方法中返回了一个自定义的 BrokerFilter 子类 MessageSendTimeFilter。MessageSendTimeFilter 重写了 send 方法,在消息发送前设置了消息的发送时间属性,并记录日志。

ActiveMQ 提供了多种常用插件,它们在不同的场景中发挥着重要作用。安全插件用于实现消息系统的安全认证和授权,确保只有合法的用户和客户端才能访问和操作消息队列和主题。例如,通过配置 SimpleAuthenticationPlugin 插件,可以实现基于用户名和密码的简单认证;通过配置 AuthorizationPlugin 插件,可以对不同的用户和角色设置不同的访问权限,如对某个队列或主题的读、写、管理权限等 。

持久化插件则负责将消息持久化到存储介质中,以确保在系统故障或重启时消息不会丢失。ActiveMQ 支持多种持久化插件,如 KahaDB、LevelDB、JDBC 等。KahaDB 是 ActiveMQ 默认的持久化插件,它使用文件系统来存储消息,具有高性能和可靠性;LevelDB 是一个基于键值对的持久化存储引擎,适用于对读写性能要求较高的场景;JDBC 插件则允许使用关系数据库来存储消息,方便与现有的数据库系统集成 。

在实际案例中,假设有一个电商系统,为了确保消息的安全传输,需要对连接到 ActiveMQ 的客户端进行身份认证。可以开发一个基于 Redis 的认证插件,在客户端连接时,插件从 Redis 中获取用户的认证信息,验证客户端的用户名和密码是否正确。以下是一个简单的基于 Redis 的认证插件示例:

 

import org.apache.activemq.broker.Broker;

import org.apache.activemq.broker.BrokerPlugin;

import org.apache.activemq.broker.ConnectionContext;

import org.apache.activemq.broker.region.Subscription;

import org.apache.activemq.command.ConnectionInfo;

import org.apache.activemq.command.ConsumerInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

public class RedisAuthPlugin implements BrokerPlugin {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisAuthPlugin.class);

private JedisPool jedisPool;

public RedisAuthPlugin(JedisPool jedisPool) {

this.jedisPool = jedisPool;

}

@Override

public Broker installPlugin(Broker broker) throws Exception {

return new RedisAuthFilter(broker, jedisPool);

}

private static class RedisAuthFilter extends BrokerFilter {

private JedisPool jedisPool;

public RedisAuthFilter(Broker next, JedisPool jedisPool) {

super(next);

this.jedisPool = jedisPool;

}

@Override

public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {

try (Jedis jedis = jedisPool.getResource()) {

String storedPassword = jedis.get(info.getUserName());

if (storedPassword == null ||!storedPassword.equals(info.getPassword())) {

throw new SecurityException("Invalid username or password");

}

LOGGER.info("User {} authenticated successfully", info.getUserName());

}

super.addConnection(context, info);

}

}

}

在配置文件 activemq.xml 中,添加如下配置:

 

<beans>

<bean id="jedisPool" class="redis.clients.jedis.JedisPool">

<constructor-arg name="host" value="localhost"/>

<constructor-arg name="port" value="6379"/>

</bean>

<bean id="redisAuthPlugin" class="com.example.RedisAuthPlugin">

<constructor-arg ref="jedisPool"/>

</bean>

</beans>

<broker>

<plugins>

<bean xmlns="http://www.springframework.org/schema/beans" ref="redisAuthPlugin"/>

</plugins>

</broker>

通过上述配置和插件开发,实现了基于 Redis 的客户端身份认证功能,增强了 ActiveMQ 在电商系统中的安全性 。

(二)网络扩展(Network of Brokers)

ActiveMQ 的网络扩展机制,即 Network of Brokers,允许将多个 ActiveMQ Broker 连接在一起,形成一个分布式的消息处理网络。这种机制的核心概念是通过网络连接器(Network Connector)将不同的 Broker 连接起来,使得消息可以在这些 Broker 之间流动,从而实现消息的分布式处理和负载均衡 。

其原理基于消息的转发和路由。当一个 Producer 发送消息到某个 Broker 时,该 Broker 可以根据配置将消息转发到其他与之相连的 Broker 上,直到消息被 Consumer 接收和处理。在这个过程中,每个 Broker 都可以独立地处理消息,并且可以存储消息,从而降低单个 Broker 的负载压力 。

ActiveMQ 提供了多种网络扩展模型,其中 Forward Bridge 模型是较为常用的一种。在 Forward Bridge 模型中,多个 Broker 通过网络连接器(Network Connector)连接在一起,形成一个有向的网络结构。每个 Broker 都可以作为消息的生产者和消费者,消息在这个网络中按照配置的规则进行转发。例如,当一个 Broker 接收到一条消息时,如果它没有对应的消费者,它可以将消息转发到其他有消费者的 Broker 上 。

在集群配置方面,首先需要确保每个 Broker 都有唯一的名称和 ID。然后,通过在 activemq.xml 配置文件中添加 networkConnectors 元素来配置网络连接器。以下是一个简单的集群配置示例:

 

<broker brokerName="broker1" dataDirectory="${activemq.data}">

<networkConnectors>

<networkConnector name="connector1" uri="static:(tcp://broker2:61616)" duplex="true"/>

</networkConnectors>

<transportConnectors>

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>

</transportConnectors>

</broker>

 

<broker brokerName="broker2" dataDirectory="${activemq.data}">

<networkConnectors>

<networkConnector name="connector2" uri="static:(tcp://broker1:61616)" duplex="true"/>

</networkConnectors>

<transportConnectors>

<transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>

</transportConnectors>

</broker>

在上述配置中,broker1 和 broker2 通过 networkConnector 相互连接,duplex="true" 表示双向连接。这样,消息就可以在两个 Broker 之间双向流动。

网络扩展在提高系统性能和可用性方面有着显著的作用。在性能方面,通过将消息负载均衡到多个 Broker 上,可以提高系统的吞吐量,减少单个 Broker 的处理压力。在可用性方面,当某个 Broker 出现故障时,其他 Broker 可以继续处理消息,确保系统的正常运行。例如,在一个高并发的电商订单处理系统中,大量的订单消息需要及时处理。通过 ActiveMQ 的网络扩展机制,将多个 Broker 组成集群,订单消息可以被分发到不同的 Broker 上进行处理,提高了订单处理的速度和系统的稳定性 。

在实际案例中,假设有一个大型的电商平台,在促销活动期间,订单量会急剧增加。为了应对高并发的订单消息处理,采用 ActiveMQ 的网络扩展机制搭建了一个 Broker 集群。在这个集群中,有多个 Broker 节点,每个节点都配置了相应的 networkConnector,实现了节点之间的互联互通。当用户下单后,订单消息被发送到 ActiveMQ 集群中,集群中的各个 Broker 根据负载情况对订单消息进行处理,大大提高了订单处理的效率和系统的可用性 。

(三)其他扩展方式

  1. 协议扩展:ActiveMQ 支持协议扩展,允许开发者添加自定义的协议来与消息中间件进行通信。其原理是通过实现 TransportFactory 接口来创建自定义的传输协议。TransportFactory 负责创建 Transport 对象,Transport 对象则负责实际的消息传输。例如,如果项目需要使用一种新的网络协议来传输消息,可以实现 TransportFactory 接口,创建一个新的 Transport 实现类,在这个类中实现新协议的消息发送和接收逻辑 。在物联网场景中,设备可能使用特定的轻量级协议进行通信,通过协议扩展,ActiveMQ 可以支持这些设备的接入,实现设备与后端系统之间的消息传递 。
  1. 存储扩展:ActiveMQ 的存储扩展机制允许开发者使用自定义的存储方式来持久化消息。这是通过实现 PersistenceAdapter 接口来实现的。PersistenceAdapter 负责管理消息的存储和检索。比如,如果项目对消息存储有特殊的需求,如需要将消息存储到分布式文件系统中,可以实现 PersistenceAdapter 接口,在实现类中编写将消息存储到分布式文件系统的逻辑 。在大数据处理场景中,消息量巨大,使用传统的存储方式可能无法满足性能和容量要求,通过存储扩展,使用分布式存储系统来存储消息,可以提高消息存储和检索的效率 。
http://www.xdnf.cn/news/5157.html

相关文章:

  • 单脉冲前视成像多目标分辨算法——论文阅读
  • stm32之IIC
  • 基于STM32的居家环境监测报警Proteus仿真+程序设计+设计报告+讲解视频
  • 利用多AI协作实现AI编辑器高效开发:创新架构与实践基本构想
  • DeepSeek 实现趣味心理测试应用开发教程
  • JAVA自动装箱拆箱
  • 车载电子电器架构 --- 汽车网关概述
  • 【计算机视觉】OpenCV实战项目:Athlete-Pose-Detection 运动员姿态检测系统:基于OpenCV的实时运动分析技术
  • [面试]SoC验证工程师面试常见问题(五)TLM通信篇
  • 引言:Client Hello 为何是 HTTPS 安全的核心?
  • 前端HTMX技术详细解释
  • 第十七次博客打卡
  • AZScreenRecorder最新版:功能强大、操作简便的手机录屏软件
  • 网络编程套接字
  • [白话文] 从百草园RLHF到三味书屋DPO
  • 全栈开发实战:FastAPI + React + MongoDB 构建现代Web应用
  • MCP协议:大模型与外部工具交互的标准化创新方案
  • 从零开始跑通3DGS教程:(四)修改(缩放、空间变换)colmap生成的sfm结果
  • SpringBoot框架开发网络安全科普系统开发实现
  • 分布式事务快速入门
  • 小程序多线程实战
  • 功能齐全的菜谱管理器Tamari
  • [论文阅读]BadPrompt: Backdoor Attacks on Continuous Prompts
  • 23、Next.js:时空传送门——React 19 全栈框架
  • window 显示驱动开发-线性伸缩空间段
  • 简单网络交换、路由二
  • JavaWeb:JDBC
  • 关于ffmpeg的简介和使用总结
  • Kotlin Android LeakCanary内存泄漏检测实战
  • RT-Thread 深入系列 Part 5:物联网与网络应用实战