Allen Blog Allen Blog

SELF SPACE

目录
RabbitMQ搭建测试
/    

RabbitMQ搭建测试

RabbitMQ

简介

串行:业务中需要发请求,必须等请求响应过来了,才可以进行下一步的代码操作

消息传递:服务与服务之间 通过消息发送数据来通信,而不是互相调用

排队:指得是应用程序通过队列来通信

作用:

异步处理 A B C三个请求,B C不会影响A请求

解耦服务 避免每个服务之间的冗余联系

流量削峰 避免流量堆积

Linux安装

三个rpm包

1:erlang

2:socat

3:RabbitMQ

启动服务

31  cd /usr/lib/rabbitmq/
   32  ls
   33  cd bin/
   34  ls
   35  rabbitmq-plugins enable rabbitmq_management
   36  systemctl restart rabbitmq-server
   37  systemctl start rabbitmq-server
   38  systemctl status rabbitmq-server

三个端口

15672 web页面

25672 集群接口

5672 接受命令的端口

[root@localhost bin]# netstat -anp|grep beam
tcp        0      0 0.0.0.0:15672           0.0.0.0:*               LISTEN      20728/beam.smp
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      20728/beam.smp
tcp        0      0 127.0.0.1:59040         127.0.0.1:4369          ESTABLISHED 20728/beam.smp
tcp6       0      0 :::5672                 :::*                    LISTEN      20728/beam.smp
unix  3      [ ]         STREAM     CONNECTED     43996    20728/beam.smp
unix  3      [ ]         STREAM     CONNECTED     49040    20728/beam.smp
[root@localhost bin]#

创建管理员

[root@localhost ~]# rabbitmqctl add_user admin admin						#添加用户
Adding user "admin" ...
[root@localhost ~]# rabbitmqctl set_user_tags admin administrator			#绑定权限
Setting tags for user "admin" to [administrator] ...
[root@localhost ~]# rabbitmqctl change_password admin 123456				#修改密码
Changing password for user "admin" ...

管理界面

Connections:客户端和RabbitMQ所建立的连接

Channels:通道 传输数据

Exchanges:交换机 将数据转发到队列中

Queues:队列

Consumers:消费者

工作流程图

**Broker:**接收和分发消息的应用,RabbitMQ Server就是 Message Broker

**Virtual host:**出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

**Connection:**publisher/consumer 和 broker 之间的 TCP 连接

**Channel:**如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

**Exchange:**message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

**Queue:**消息最终被送到这里等待 consumer 取走

**Binding:**exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

管理界面

使用MQ

Java获取MQ连接对象

SpringBoot Pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>demo02-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo02-mq</name>
    <description>demo02-mq</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.6.RELEASE</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.example.demo02.mq.Demo02MqApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
connection Util 类
package com.example.demo02.mq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Allen
 * 4/10/2024 7:25 PM
 * @version 1.0
 * @description: MQ连接工具类
 *
 */
public class ConnectionUtils {
    //为什么使用静态代码块初始化连接工厂?
    //因为连接工厂只需要初始化一次,所以使用静态代码块初始化
    private static ConnectionFactory connectionFactory;
    static {
        // 创建连接工厂
        connectionFactory = new ConnectionFactory();
        //mq服务主机地址
        connectionFactory.setHost("ningbo-3689d402.of-7af93c01.shop");
        //连接端口
        connectionFactory.setPort(40991);
        connectionFactory.setVirtualHost("/my240410");
        //设置用户名
        connectionFactory.setUsername("allen");
        //设置密码
        connectionFactory.setPassword("123456");
    }
    public static Connection getConnection() {
        try {
            //返回连接 通过工厂获取连接
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}


@SpringBootTest
class Demo02MqApplicationTests {

    @Test
    void contextLoads() {
        Connection connection = ConnectionUtils.getConnection();
        System.out.println(connection);
    }

}

结果:
    amqp://allen@110.42.14.112:40991//my240410

五大消息模型

* 操作MQ
 *  生产者:发送消息
 *  消费者:消费信息
 *
 *  五大消息模型:
 *      1、simple消息模型
 *          一个生产者、一个消费者、一个队列
 *      2、work消息模型
 *          一个生产者、多个消费者、一个队列
 *          能者多劳:消费者性能高就可以多消费一些信息
 *      3、fanout消息模型
 *          一个生产者、一个交换机、多个队列、多个消费者
 *          怎么实现一个生产者发送消息给多个消费者?
 *          生产者发送消息给交换机,交换机发送消息给队列,队列发送消息给消费者
 *          fanout:广播的意思,发送消息给所有绑定的队列
 *      4、direct消息模型
 *          一个生产者、一个交换机、多个队列、多个消费者 (队列绑定交换机时需要指定routingKey)
 *      5、topic消息模型
 *          一个生产者/多个生产者、一个交换机、多个队列、多个消费者 (队列绑定交换机时使用的routingKey是通配符)
 *              *:通配一级任意多个字符
 *              #:通配多级任意多个字符
 *              例如:user.*:匹配user.add、user.delete
 *                   user.#:匹配user.add、user.delete、user.add.delete
simple消息模型
生产者
package com.example.demo02.mq.simple;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author Allen
 * 4/10/2024 8:07 PM
 * @version 1.0
 * @description: 简单模型-生产者
 */
public class SimpleSender {
    public static void main(String[] args) throws Exception {
        // 1、创建连接
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道
        Channel channel = connection.createChannel();
        // 3、声明队列:队列可以缓存数据、mq的交换机不能存储数据 simple模型使用的是默认交换机
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:其他属性
        channel.queueDeclare("simple_queue", false, false, false, null);
        // 4、发送消息:发送到队列中
        // 在simple模型中   参数1:交换机 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容(任意数据类型)
        String msg = "hello rabbitmq";
        channel.basicPublish("", "simple_queue", null, msg.getBytes());
        // 5、关闭通道
        channel.close();
        // 6、关闭连接
        connection.close();
    }
}
消费者
package com.example.demo02.mq.simple;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/10/2024 8:59 PM
 * @version 1.0
 * @description: 简单模型-消费者
 */
public class SimpleReciver {
    public static void main(String[] args) throws Exception {
        // 1、创建连接
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道
        Channel channel = connection.createChannel();
        // 3、声明队列:如果已声明可以忽略
        // 4、监听队列:
        channel.queueDeclare("simple_queue", false, false, false, null);
        Consumer consumer = new DefaultConsumer(channel){
            // 消息的具体处理逻辑
            // 消费者接收到消息后调用此方法
            // 如果AutoAck为true,此方法一旦被调用,消息会被确认
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者接收到消息:"+new String(body));
                /*
                channel.basicReject();    //拒绝消息,可以丢弃消息,也可以重新入队
                channel.basicNack();      //不确认消息,可以设置是否重新入队
                */
                channel.basicAck(envelope.getDeliveryTag(),false);         //手动确认消息,参数1:消息的标识,参数2:是否开启多个消息同时确认
            }
        };
/*        参数1:队列名称 参数2:是否自动确认 参数3:消费者
        队列名称:消费哪个队列的消息
        AutoAck:是否自动确认,如果为true,表示消息一旦被接收,自动向mq回复接收到,mq会删除消息,如果为false,需要手动确认,如果消费者挂掉,消息会被重回队列
        Consumer:消费者对象
        channel.basicConsume("simple_queue", true,consumer);
        避免消息丢失,手动确认
*/
        channel.basicConsume("simple_queue", false,consumer);

    }
}

Unacked : 消费者用了、但是没有告诉交换机、就是未确认的状态

当消息被消费了,且告诉交换机了,交换机就会将这个消息删除

Work消息模型
* work模型:
*      多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次
*      作用:提高消息的消费速度,避免消息的堆积
*      默认采用轮询的方式分发消息
*      如果某个消费者处理消息慢,会导致消息堆积
生产者
package com.example.demo02.mq.work;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @author Allen
 * 4/10/2024 9:37 PM
 * @version 1.0
 * @description: work模式发送者
 *
 * work模型:
 *      多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次
 *      作用:提高消息的消费速度,避免消息的堆积
 *      默认采用轮询的方式分发消息
 *      如果某个消费者处理消息慢,会导致消息堆积
 */
public class WorkSender {
    public static void main(String[] args) throws Exception {
//        1:获取连接
        Connection connection = ConnectionUtils.getConnection();
//        2:创建通道
        Channel channel = connection.createChannel();
//        3:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
        channel.queueDeclare("work.queue", false, false, false, null);
//        4:发送100条消息
for (int i = 0; i < 100; i++) {
            String msg = "work模式消息" + i;
            //休眠i*5毫秒
            TimeUnit.MILLISECONDS.sleep(i * 5);
            // 参数1:交换机名称 参数2:队列名称 参数3:消息的其他属性 参数4:消息的内容
            channel.basicPublish("", "work.queue", null, msg.getBytes());
            System.out.println("work模式发送消息:" + msg);
        }
//        5:关闭通道
        channel.close();
//        6:关闭连接
        connection.close();
    }
}
消费者1

(能者多劳角色)

package com.example.demo02.mq.work;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/10/2024 9:37 PM
 * @version 1.0
 * @description: work模式消费者1号
 */
public class WorkReciver1 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
        channel.queueDeclare("work.queue", false, false, false, null);
        // 4:定义消费者,消费消息
        // 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象
        Consumer consumer = new DefaultConsumer(channel) {
            // 消费者接收消息调用此方法
            // 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 获取消息
                String msg = new String(body);
                System.out.println("work模式消费者1号接收消息:" + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume("work.queue", false, consumer);

    }
}
消费者2

(消费能力差)

package com.example.demo02.mq.work;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @author Allen
 * 4/10/2024 9:37 PM
 * @version 1.0
 * @description: work模式消费者1号
 */
public class WorkReciver2 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
        channel.queueDeclare("work.queue", false, false, false, null);
            //如果此消费者性能较差,配置能者多劳:指定一次获取几条信息,消息消费成功后 ack之后 mq才会发送下一条消息
            channel.basicQos(1);
        // 4:定义消费者,消费消息
        // 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象
        Consumer consumer = new DefaultConsumer(channel) {
            // 消费者接收消息调用此方法
            // 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //模拟二号消费者处理消息慢
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 获取消息:执行业务
                    String msg = new String(body);
                    System.out.println("work模式消费者2号接收消息:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数1:队列名称 参数2:ACK是否自动确认 参数3:消费者对象
        //必须手动确认消息,否则会报406错误
        channel.basicConsume("work.queue", false, consumer);

    }
}
结果:

能者多劳

Fanout消息模型
* 广播模型:
    *  一个交换机绑定多个队列
    *  每个队列都有一个消费者
    *  每个消费者消费自己队列中的消息,每个队列的信息是一样的
生产者
package com.example.demo02.mq.fanout;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 8:24 AM
 * @version 1.0
 * @description: 广播模型发送者
 *
 * 广播模型:
     *  一个交换机绑定多个队列
     *  每个队列都有一个消费者
     *  每个消费者消费自己队列中的消息,每个队列的信息是一样的
 */
public class FanoutSender {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明交换机
        // 参数1:交换机名称 参数2:交换机类型 (fanout direct topic) 参数3:是否持久化
        /*
            fanout:广播模式
                绑定了这个交换机的队列都会收到消息
            direct:路由模式
                通过路由键完全匹配的队列会收到消息
            topic:通配符模式
                通过通配符匹配的队列会收到消息
        */
        channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
        // 交换机不会存储消息,只是负责消息的转发,如果没有队列绑定到交换机上,消息会丢失
        // 4:发送消息到交换机:需要消费信息的消费者自己声明自己的队列绑定到当前交换机上
        String msg = "fanout message";
        channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
        // 5:关闭通道
        channel.close();
        // 6:关闭连接
        connection.close();
    }
}
消费者1
package com.example.demo02.mq.fanout;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 8:55 AM
 * @version 1.0
 * @description: 广播模型接收者
 */
public class FanoutReceiver1 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明交换机
        //为什么消费者也得声明交换机?如果消费者先启动,那么交换机还没有声明,消费者就会报错,所以消费者也得声明交换机
        // 参数1:交换机名称 参数2:交换机类型 参数3:是否持久化
        channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
        // 4:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:其他参数
        channel.queueDeclare("fanout.queue1", false, false, false, null);
        // 5:绑定自己的队列到交换机
        channel.queueBind("fanout.queue1", "fanout.exchange", "");
        // 6:消费消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            // 参数1:消费者标签 参数2:消息传递参数 参数3: 参数4:消息内容
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 消费消息
                System.out.println("Fanout1接收到的消息是:" + new String(body));
                // 手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("fanout.queue1",false,consumer);
    }
}
消费者2
package com.example.demo02.mq.fanout;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 8:55 AM
 * @version 1.0
 * @description: 广播模型接收者
 */
public class FanoutReceiver2 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明交换机
        //为什么消费者也得声明交换机?如果消费者先启动,那么交换机还没有声明,消费者就会报错,所以消费者也得声明交换机
        channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
        // 4:声明队列
        // 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:其他参数
        channel.queueDeclare("fanout.queue2", false, false, false, null);
        // 5:绑定队列到交换机
        channel.queueBind("fanout.queue2", "fanout.exchange", "");
        // 6:消费消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            // 参数1:消费者标签 参数2:消息传递参数 参数3: 参数4:消息内容
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 消费消息
                System.out.println("Fanout2接收到的消息是:" + new String(body));
                // 手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("fanout.queue2",false,consumer);
    }
}
结果

Direct消息模型
  • 路由模型:
  • 一个交换机可以绑定多个队列
  • 生产者给交换机发送消息时,需要指定消息的路由键
  • 消费者绑定队列到交换机时,需要指定所需要消费的信息的路由键
  • 交换机会根据消息的路由键将消息转发到对应的队列
  • 缺点:
  • 当消息很多的时候,需要指定的路由键也会很多,究极复杂。
    
###### 生产者
package com.example.demo02.mq.direct;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 9:30 AM
 * @version 1.0
 * @description: 路由模型发送者
 *
 * 路由模型:
 *    一个交换机可以绑定多个队列
 *    生产者给交换机发送消息时,需要指定消息的路由键
 *    消费者绑定队列到交换机时,需要指定所需要消费的信息的路由键
 *    交换机会根据消息的路由键将消息转发到对应的队列

 *    缺点:
 *      当消息很多的时候,需要指定的路由键也会很多,究极复杂。
 */
public class DirectSender {
    public static void main(String[] args) throws Exception {
        // 1:创建连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明交换机   参数1:交换机名称 参数2:交换机类型 参数3:是否持久化
        channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);

        // 6:发送消息
        String msg1 = "{To DirectReceiver1: orderId:1001}";
        String msg2 = "{To DirectReceiver2: orderId:1002}";
        // 参数1:交换机 参数2:路由键(与消费者相匹配) 参数3:其他参数 参数4:消息内容
        channel.basicPublish("direct.exchange","order.save",null,msg1.getBytes());
        channel.basicPublish("direct.exchange","order.update",null,msg2.getBytes());
        // 7:关闭通道
        channel.close();
        // 8:关闭连接
        connection.close();
    }
}
消费者1
package com.example.demo02.mq.direct;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 9:44 AM
 * @version 1.0
 * @description: 路由模型接收者1
 */
public class DirectReceiver1 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);

        channel.queueDeclare("direct.queue1", false, false, false, null);

        channel.queueBind("direct.queue1","direct.exchange","order.save");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("DirectReceiver1接收到的新增订单消息是:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("direct.queue1",false,consumer);
    }
}
消费者2
package com.example.demo02.mq.direct;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 9:44 AM
 * @version 1.0
 * @description: 路由模型接收者2
 */
public class DirectReceiver2 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);

        channel.queueDeclare("direct.queue2", false, false, false, null);

        channel.queueBind("direct.queue2","direct.exchange","order.update");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("DirectReceiver2接收到的修改订单消息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("direct.queue2",false,consumer);
    }
}
结果

Topic消费模型
*  通配符模型
*      生产者必须指定完整且准确的路由key
*      消费者可以使用通配符
*          *:可以替代一级的任意字符 add.*     ==>     add.user                add.goods
*          #:可以替代多级的任意字符 add.#     ==>     add.user.name           add.user.name.firstName
生产者
package com.example.demo02.mq.topic;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 10:15 AM
 * @version 1.0
 * @description: 通配符模型发送者
 *  通配符模型
 *      生产者必须指定完整且准确的路由key
 *      消费者可以使用通配符
 *          *:可以替代一级的任意字符 add.*     ==>     add.user                add.goods
 *          #:可以替代多级的任意字符 add.#     ==>     add.user.name           add.user.name.firstName
 */
public class TopicSender {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明交换机
        channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
        // 4:发送消息 路由Key写法 goods.add 不要使用通配符
        String msg1 = "商品新增了,Topic模型,routing key 为 goods.add";
        String msg2 = "商品修改了,Topic模型,routing key 为 goods.update";
        String msg3 = "商品删除了,Topic模型,routing key 为 goods.delete";
        String msg4 = "用户新增了,Topic模型,routing key 为 user.add";
        String msg5 = "用户修改了,Topic模型,routing key 为 user.update";
        String msg6 = "用户删除了,Topic模型,routing key 为 user.delete";
        String msg7 = "添加了用户名字,Topic模型,routing key 为 user.add.name";
        String msg8 = "添加了用户年龄,Topic模型,routing key 为 user.add.age";
        String msg9 = "修改了用户名字,Topic模型,routing key 为 user.update.name";
        String msg10 = "修改了用户年龄,Topic模型,routing key 为 user.update.age";


        channel.basicPublish("topic.exchange","goods.add",null,msg1.getBytes());
        channel.basicPublish("topic.exchange","goods.update",null,msg2.getBytes());
        channel.basicPublish("topic.exchange","goods.delete",null,msg3.getBytes());
        channel.basicPublish("topic.exchange","user.add",null,msg4.getBytes());
        channel.basicPublish("topic.exchange","user.update",null,msg5.getBytes());
        channel.basicPublish("topic.exchange","user.delete",null,msg6.getBytes());
        channel.basicPublish("topic.exchange","user.add.name",null,msg7.getBytes());
        channel.basicPublish("topic.exchange","user.add.age",null,msg8.getBytes());
        channel.basicPublish("topic.exchange","user.update.name",null,msg9.getBytes());
        channel.basicPublish("topic.exchange","user.update.age",null,msg10.getBytes());

        // 5:关闭连接
        channel.close();
        connection.close();
    }
}
消费者1
package com.example.demo02.mq.topic;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 10:22 AM
 * @version 1.0
 *
 * @description: 通配符模型接收者1
 */
public class TopicReceiver1 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明交换机
        channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
        // 4:声明队列
        channel.queueDeclare("topic.queue1", false, false, false, null);
        // 5:绑定队列到交换机   使用通配符* 一级任意字符  # 多级任意字符
        channel.queueBind("topic.queue1", "topic.exchange", "goods.*");
        // 6:消费消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("商品模块接收到的消息是:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("topic.queue1",false,consumer);

    }
}
消费者2
package com.example.demo02.mq.topic;

import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 10:22 AM
 * @version 1.0
 *
 * @description: 通配符模型接收者2
 */
public class TopicReceiver2 {
    public static void main(String[] args) throws Exception {
        // 1:获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 2:创建通道
        Channel channel = connection.createChannel();
        // 3:声明交换机
        channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
        // 4:声明队列
        channel.queueDeclare("topic.queue2", false, false, false, null);
        // 5:绑定队列到交换机  使用通配符 user.*  user.#
        channel.queueBind("topic.queue2", "topic.exchange", "user.#");
        // 6:消费消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("用户模块接收到的消息是:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("topic.queue2",false,consumer);

    }
}
结果

数据持久化

数据丢失

会造成的问题:

生产者发送消息失败导致丢失:

1:没有发送到mq的交换机(交换机不存在、网络通信失败)

2:交换机没有绑定队列(交换机会丢弃信息)

解决:生产者确认回调:

rabbitTemplate可以设置 生产者确认回调

1、消息是否到达交换机回调

2、消息没有到达队列的回调

mq宕机导致丢失:

默认mq 交换机队列 消息没有持久化,mq非法关闭的时候,数据会丢失

声明队列交换机发送信息时都配置持久化

消费者消费消息导致丢失 手动ACK

解决

交换机

队列

都需要持久化

使用了持久化的交换机和消息,宕机之后不会被清除

对交换机发送消息,需要指定持久化数据类型

生产者
// 参数1:交换机名称 参数2:交换机类型 参数3:是否持久化
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,true);
//参数1:交换机名称 参数2:路由key 参数3:消息持久化的数据类型配置 参数4:消息内容
channel.basicPublish("topic.exchange","goods.add", MessageProperties.PERSISTENT_TEXT_PLAIN,msg1.getBytes());

消费者
// 3:声明交换机
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,true);

使用了持久化的队列 可以保存住队列里面的消息

springboot整合rabbitmq

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.allen</groupId>
    <artifactId>demo03-order-provider</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo03-order-provider</name>
    <description>demo03-order-provider</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.6.RELEASE</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.allen.demo03.order.Demo03OrderProviderApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

application.yaml文件

#端口号
server:
  port: 8110
spring:
  application:
    name: order-service
    #虚拟机重启之后,rabbitmq服务需要重启 systemctl restart rabbitmq-server
    #rabbitmq配置
  rabbitmq:
    host: ningbo-3689d402.of-7af93c01.shop
    port: 40991
    username: admin
    password: admin
    virtual-host: /my240410
    #配置mq生产者确认回调  默认是false
    publisher-returns: true
    #correlated: 使用额外的线程池的线程来调用回调方法
    #simple: 使用发布者的线程池的线程来调用回调方法
    publisher-confirm-type: correlated

死信队列

关键词

死信交换机

死信队列

死信路由key

*      配置死信交换机、死信队列、绑定关系
*      解释死信队列:
*      1、死信队列:当消息被拒绝、消息过期、队列达到最大长度时,消息会进入死信队列
*      2、死信交换机:死信队列绑定的交换机
*      3、死信路由key:消息进入死信队列时,会携带一个路由key
*      4、死信队列绑定死信交换机
*      5、业务队列绑定死信交换机
*      6、业务队列绑定业务交换机

生产者

@Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("business.exchange",
                "stock.bussiness",
                "我是测试死信队列,所发送到business的消息!!!");
    }
config文件

配置发送消息到交换机、队列的回调

也可以配置交换机和队列

package com.allen.demo03.order.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * @author Allen
 * 4/11/2024 2:08 PM
 * @version 1.0
 */
@Configuration
public class RabbitmqConfig implements InitializingBean {
    @Resource
    RabbitTemplate rabbitTemplate;

    //spring管理bean时提供的生命周期方法,可以在当前类对象属性值初始化之后 容器调用afterPropertiesSet方法
    @Override
    public void afterPropertiesSet() throws Exception {
        //确认消息是否到达交换机的回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if(!ack){
                System.out.println("消息没有到达交换机");
                System.out.println("消息是否到达交换机:"+ack);
                System.out.println(ack?"":"未到达原因:"+cause);
                System.out.println("消息:"+correlationData);
            }
        });
        //消息没有到达队列的回调
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{
            System.out.println("消息没有到达队列");
            System.out.println("消息:"+message);
            System.out.println("应答码:"+replyCode);
            System.out.println("原因:"+replyText);
            System.out.println("交换机:"+exchange);
            System.out.println("路由键:"+routingKey);
        });
    }

    //创建交换机对象添加到spring容器中
    @Bean
    public Exchange payExchange(){
        return ExchangeBuilder.topicExchange("pay.exchange")
                .ignoreDeclarationExceptions()              // 忽略声明异常 避免异常导致程序宕机
                .durable(true)                      // 持久化
                .build();
    }

    //创建队列对象添加到spring容器中
    @Bean
    public Queue payQueue(){
        return QueueBuilder.durable("pay.queue")
                .build();
    }

    //创建绑定对象添加到spring容器中
    @Bean
    public Binding payBinding(Queue payQueue, Exchange payExchange){
        return BindingBuilder.bind(payQueue())     // 绑定队列
                .to(payExchange())                // 到交换机
                .with("pay.*")          // 路由键
                .noargs();                        // 参数
    }

}

消费者

package com.example.demo04.stock.consumer.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author Allen
 * 4/11/2024 9:30 PM
 * @version 1.0
 * @description: 消费者:库存监听器,监听库存消息,进行消费
 */
@Component
public class StockListener {
    //监听订单支付成功的消息
    //RabbitListener标注的方法,就是一个消费者;会自动监听指定的队列,当有消息到达时,会自动调用该方法进行消费
/*    @RabbitListener(
            //如果消费的队列其他地方已经创建,可以直接使用队列名称绑定队列进行消费
            queues = {"pay.queue"}
    )
    public void test(String orderToken){
        System.out.println("stock服务接收到订单支付成功的消息:"+orderToken);
    }*/

/*
    springboot整合mq:
        消费者默认消息确认方式:
            出现异常,消息会重回队列,再次尝试重新消费
            如果一直出现异常 会出现死循环消费
*/

    //库存服务自定义队列绑定到订单服务的交换机上,进行消费
    @RabbitListener(
            //如果消费的队列其他地方已经创建,可以直接使用队列名称绑定队列进行消费
            bindings = @QueueBinding(
                    value = @Queue(name = "stock.pay.queue", durable = "true"),     //创建队列
                    //@Exchange:绑定交换机;如果交换机已存在,新创建的交换机必须和已存在的交换机配置一致
                    exchange = @Exchange(name = "pay.exchange", type = "topic",ignoreDeclarationExceptions = "true"),  //绑定交换机
                    key = {"pay.#"}   //绑定路由key
            )
    )
    //参数1:消息体内容 参数2:消息对象  参数3:消息通道
    public void paySuccess(String orderToken, Message message, Channel channel) throws IOException {

        try {
//            int a = 1/0;
            System.out.println("stock服务接收到订单支付成功的消息:"+orderToken);
//        手动ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            System.out.println("手动ACK,出现异常");
            Boolean flag = message.getMessageProperties().isRedelivered();  //是否是重回队列的消息
            if (flag) {
                System.out.println("消息已经重回队列且再次消费出现异常被捕获,丢弃信息");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
//                除了丢弃消息,还可以绑定死信队列,将消息转发到死信队列

            }else {
                System.out.println("消息第一次消费出现异常被捕获,重新回到队列中");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            }
        }
    }
    /*
    *   手动ACK时:
    *       业务方法可能会出现异常,导致消息一致不能正确ACK,一直处于unacked状态,直到消费者进程停止
    *       如果是暂时的网络波动导致本次消费异常,过一会网络正常就可以正确消费消息了,但是消息不会重新回到队列中
    *   为了避免消费失败的信息一直不能被处理处于unacked状态
    *       解决:
    *           1:手动ack时有编译时的异常,try catch时可以捕获最大的异常
    *           2:当第一次消费失败时进入到了catch中,可以将消息重新回到队列中(unacked 转换为ready),让消费者再次尝试消费
    *           3:当第二次消费失败时进入到了catch中,可以丢弃消息(私信队列:丢弃信息的队列如果绑定了死信队列,消息到达死信队列,如果没有绑定消息丢弃)
    *
    *       如果消息无关紧要,可以丢失
    *           重要消息,必须保证消息的可靠性(不丢失)
    *               1:生产者确认
    *               2:交换机队列消息持久化
    *               3:消费者手动ACK
    *                   手动ACK时,消息重复消费失败会丢弃掉,可以给该队列绑定死信队列
    *           私信队列:本质就是一个队列
    *              一般会为死信队列创建 交换机 队列 特定路由Key绑定
    *               需要使用死信队列的队列,设置额外的参数(死信队列参数,丢弃信息时保存信息的死信配置)
    *
    *           队列可以配置的参数:mq的控制台,再新建Queue时,可以看到队列可以配置的参数
    *               x-max-length:队列存储的最大信息长度
    *               x-max-length-bytes:队列所有信息的总字节数限制
    *                           以上两个配置 哪个先到达阈值哪个先生效
    *               x-dead-letter-exchange:死信交换机
    *                            队列消息丢弃由该交换机处理
    *               x-dead-letter-routing-key:死信路由key
    *                            队列消息丢弃  死信交换机会将消息使用该路由key分发
    *               x-message-ttl:消息的过期时间 单位毫秒
    *                           如果队列的消息到达设置时间的过期时间仍然未被消费 消息会被丢弃
    *                       以后队列消息丢弃时的情况:
    *                           1:代码手动丢弃
    *                           2:消息队列已满存不下去新的消息时 会被丢弃
    *                           3:消息队列设置了过期时间,消息过期后会被丢弃
    *
    *
    *
    *
    * */

    //测试死信
    //业务队列的消费者
    @RabbitListener(queues = "business.queue")
    public void businessListener(Message message, Channel channel , String msg) throws IOException {
        try {
            int a =1 / 0;
            //消费消息的业务代码
            System.out.println("测试死信队列:所接受到的business队列的消息:"+msg);
            //手动ACK 参数1:消息的tag 参数2:是否批量确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            if(message.getMessageProperties().isRedelivered()){
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                //消息重新回到队列中 参数1:消息的tag 参数2:是否批量确认 参数3:是否重新回到队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            }
        }
    }



}
config文件
package com.example.demo04.stock.consumer.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.Map;

/**
 * @author Allen
 * 4/12/2024 11:27 AM
 * @version 1.0
 *
 * @description: 库存服务的MQ配置类
 *      配置死信交换机、死信队列、绑定关系
 *      解释死信队列:
 *      1、死信队列:当消息被拒绝、消息过期、队列达到最大长度时,消息会进入死信队列
 *      2、死信交换机:死信队列绑定的交换机
 *      3、死信路由key:消息进入死信队列时,会携带一个路由key
 *      4、死信队列绑定死信交换机
 *      5、业务队列绑定死信交换机
 *      6、业务队列绑定业务交换机
 */
@Configuration
public class StockMqConfig {
    //死信
    //1、声明死信交换机
    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange("dead.exchange")
                .ignoreDeclarationExceptions()
                .build();
    }
    //2、声明死信队列
    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable("dead.queue")
                .build();
    }
    //3、死信队列绑定死信交换机
    @Bean
    public Binding deadBinding(Exchange deadExchange, Queue deadQueue){
        return BindingBuilder.bind(deadQueue)
                .to(deadExchange)
                .with("dead.msg")
                .noargs();
    }

    //业务
    //1、声明业务交换机
    @Bean
    public Exchange businessExchange(){
        return ExchangeBuilder.topicExchange("business.exchange")
                .ignoreDeclarationExceptions()
                .build();
    }
    //2、声明业务队列:业务队列绑定死信交换机
    @Bean
    public Queue businessQueue(){
        //以后business.queue队列丢弃消息时,会自动转发到dead.exchange交换机上,路由key为dead.msg
        return QueueBuilder.durable("business.queue")
                .deadLetterExchange("dead.exchange")
                .deadLetterRoutingKey("dead.msg")
                .build();
    }
    //3、业务队列绑定业务交换机
    @Bean
    public Binding businessBinding(Exchange businessExchange, Queue businessQueue){
        return BindingBuilder.bind(businessQueue)
                .to(businessExchange)
                .with("stock.bussiness")
                .noargs();
    }
}

延迟队列

重点:交换机根据路由key来转发信息

生产者

@Test
void testDelayQueue() {
    rabbitTemplate.convertAndSend("buss2.Exchange",
            "buss2.msg",
            "我是测试延迟队列,所发送到buss2的消息!!!");
}

消费者

//延迟队列的消费者 监听器
@RabbitListener(queues = "delay.Queue")
public void delayListener(Message message,Channel channel,String msg) throws IOException {
    try {
        System.out.println("延迟队列的消费者接收到的消息:"+msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
        if(message.getMessageProperties().isRedelivered()){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        }else {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        }
    }
}
config文件
package com.example.demo04.stock.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Allen
 * 4/12/2024 2:34 PM
 * @version 1.0
 * @description: 延迟队列配置
 */
@Configuration
public class DelayMqConfig {
    //延迟队列
    //1:创建延迟队列 死信队列
    @Bean
    public Exchange delayExchange() {
        return ExchangeBuilder.topicExchange("delay.Exchange")
                .ignoreDeclarationExceptions()
                .durable(true)
                .build();
    }

    //2:创建死信队列
    @Bean
    public Queue delayQueue() {
        return QueueBuilder
                .durable("delay.Queue")
                //绑定死信交换机、死信路由key
//                .deadLetterExchange("dead.exchange")
//                .deadLetterRoutingKey("dead.msg")
                .build();
    }

    @Bean
    public Binding delayBinding(Exchange delayExchange,Queue delayQueue) {
        return BindingBuilder.bind(delayQueue)
                .to(delayExchange)
                .with("delay.msg")
                .noargs();

    }
    //业务队列:设置过期时间,并且绑定死信交换机、死信路由key
    @Bean
    public Exchange buss2Exchange() {
        return ExchangeBuilder.topicExchange("buss2.Exchange")
                .ignoreDeclarationExceptions()
                .durable(true).
                build();
    }

    @Bean
    public Queue buss2Queue() {
        return QueueBuilder.durable("buss2.Queue")
                //利用死信交换机来设置延迟  =====>   延迟交换机
                //配置延迟交换机、死信路由key
                .deadLetterExchange("delay.Exchange")
                .deadLetterRoutingKey("delay.msg")
                //设置过期时间,单位毫秒,一旦消息过期,会自动转发到延迟队列(死信队列)
                .ttl(1*60*1000)
                .build();
    }

    @Bean
    public Binding buss2Binding(Exchange buss2Exchange,Queue buss2Queue) {
        return BindingBuilder.bind(buss2Queue)
                .to(buss2Exchange)
                .with("buss2.msg")
                .noargs();
    }
}

传输复杂类型数据

生产者

@Test
void testParam() {
    Map map = new HashMap<>();
    map.put("orderToken", "123456123");
    map.put("userId", "1001");
    map.put("amount", 100D);
    rabbitTemplate.convertAndSend("pay.exchange",
            "param.pay",map
            );
}

消费者

//接收复杂类型的消息
// 复杂类型会通过JAVA的序列化机制进行序列化,然后再进行传输到队列中
@RabbitListener(bindings = {
        @QueueBinding(
                value = @org.springframework.amqp.rabbit.annotation.Queue(value = "param.queue", durable = "true"),
                exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "pay.exchange", type = ExchangeTypes.TOPIC),
                key = {"param.pay"}
        )
})
public void paramsListener(Message message , Channel channel, Map map) throws IOException {
    System.out.println("接收到的Map类型消息:" + map);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

面试题

为什么使用RabbitMQ

作用:异步、解耦和、削峰填谷

RabbitMQ的工作模式

simple
work
订阅模型(fanout,direct,topic)

可靠性

如何避免消息丢失

百分百避免消息丢失,项目性能非常差

(本地消息表:发送消息时,同时将消息作为日志存到本地消息表中 状态设置为 消息发送中、消息成功到达队列、交换机的回调方法中可以设置消息表中消息的状态,最后通过定时任务查找状态为发送失败的信息 重新发送)

一般业务场景:

1:生产者确认

2:持久化

3:消费者确认

手动ack

重新投递

丢弃消息到死信队列(人工干预)

重复消费

幂等性

因为生产者可能重新发送同一个消息,消费者消费异常时也会重新让消息归队

如果网络通信失败:生产者消费者已经发送,或者,重新让消息归队,或者,消费消息成功但跟mq交互时通信失败,都会导致消息以后再次被消费

和防止表单重复提交类似:

1、生产者发送消息时 可以携带一个token(唯一字符串 可以是一个id、也可以是自己生成的数据)

消费者消费时 验证token在本地数据库表中是否已存在/redis的set中是否已存在

如果已存在 表示消息已经被消费过 本次不处理

如果不存在 将token存到本地数据库表中/redis的string并设置过期时间

2、同一个队列中每个消息都有自己的唯一id

如何保证RabbitMQ消息的顺序性

一个队列 一个消费者

消息积压

消费者不足

1、增加部署消费者服务实例

2、开启能者多劳

一个消费者可以配置一次获取多个消息

开启多线程消费消息,提高并发能力

#能者多劳+多线程
#一次性获取消息的数量
prefetch: 3
#允许并发处理的线程数
concurrency: 3
#最大并发处理的线程数
max-concurrency: 3

标题:RabbitMQ搭建测试
作者:jiu5
地址:http://solo.jiufog.space/articles/2025/03/05/1741139298330.html