RabbitMQ快速入门
RabbitMQ快速⼊⻔
引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.22.0</version></dependency>
编写生产者代码
具体步骤:
- 建立连接
- 开启信道
- 声明交换机
- 声明队列
- 发送消息
- 释放资源
public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(ip地址,填自己的);connectionFactory.setPort(5672); //需提前开放端口号connectionFactory.setUsername("admin");//账号connectionFactory.setPassword("admin");//密码connectionFactory.setVirtualHost("study");//虚拟主机Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机 使用内置的交换机//4.声明队列/*** Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,* Map<String, Object> arguments) throws IOException;** 参数说明:* queue:队列名称* durable:可持久化* exclusive:是否独占* autoDelete:是否自动删除* arguments:额外参数*/channel.queueDeclare("hello",true,false,false,null);//5.发消息/*** void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;** 参数声明:* exchange:交换机名称,不写代表使用内置交换机* routingKey:路由名称, routingKey = 队列名称 (使用内置交换机,routingKey与队列名称保持一致)* props:属性配置* body:消息*/for (int i = 0; i < 10; i++) {String msg = "hello rabbitmq ~" + i;channel.basicPublish("","hello",null,msg.getBytes());}System.out.println("消息发送成功~");//6.资源释放channel.close();connection.close();}
编写消费者代码
具体步骤:
- 建立连接
- 创建信道
- 声明队列
- 消费消息
- 释放资源
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1.建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(ip地址);connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("study");Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();//3.声明队列(可以省略)channel.queueDeclare("hello",true,false,false,null);//4.消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息,就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};/*** String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;* 参数说明:* queue:队列名称* autoAck:是否自动确认* callback:接收到消息后,执行的逻辑*/channel.basicConsume("hello",true,consumer);//等待程序完成Thread.sleep(5000);//5.释放资源channel.close();connection.close();}