串行:业务中需要发请求,必须等请求响应过来了,才可以进行下一步的代码操作
消息传递:服务与服务之间 通过消息发送数据来通信,而不是互相调用
排队:指得是应用程序通过队列来通信
异步处理 A B C三个请求,B C不会影响A请求
解耦服务 避免每个服务之间的冗余联系
流量削峰 避免流量堆积
1:erlang
2:socat
3:RabbitMQ
31 cd /usr/lib/rabbitmq/
32 ls
33 cd bin/
34 ls
35 rabbitmq-plugins enable rabbitmq_management
36 systemctl restart rabbitmq-server
37 systemctl start rabbitmq-server
38 systemctl status rabbitmq-server
15672 web页面
25672 集群接口
5672 接受命令的端口
[root@localhost bin]# netstat -anp|grep beam
tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN 20728/beam.smp
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 20728/beam.smp
tcp 0 0 127.0.0.1:59040 127.0.0.1:4369 ESTABLISHED 20728/beam.smp
tcp6 0 0 :::5672 :::* LISTEN 20728/beam.smp
unix 3 [ ] STREAM CONNECTED 43996 20728/beam.smp
unix 3 [ ] STREAM CONNECTED 49040 20728/beam.smp
[root@localhost bin]#
[root@localhost ~]# rabbitmqctl add_user admin admin #添加用户
Adding user "admin" ...
[root@localhost ~]# rabbitmqctl set_user_tags admin administrator #绑定权限
Setting tags for user "admin" to [administrator] ...
[root@localhost ~]# rabbitmqctl change_password admin 123456 #修改密码
Changing password for user "admin" ...
Connections:客户端和RabbitMQ所建立的连接
Channels:通道 传输数据
Exchanges:交换机 将数据转发到队列中
Queues:队列
Consumers:消费者
**Broker:**接收和分发消息的应用,RabbitMQ Server就是 Message Broker
**Virtual host:**出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
**Connection:**publisher/consumer 和 broker 之间的 TCP 连接
**Channel:**如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
**Exchange:**message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
**Queue:**消息最终被送到这里等待 consumer 取走
**Binding:**exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demo02-mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo02-mq</name>
<description>demo02-mq</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.6.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.example.demo02.mq.Demo02MqApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.example.demo02.mq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Allen
* 4/10/2024 7:25 PM
* @version 1.0
* @description: MQ连接工具类
*
*/
public class ConnectionUtils {
//为什么使用静态代码块初始化连接工厂?
//因为连接工厂只需要初始化一次,所以使用静态代码块初始化
private static ConnectionFactory connectionFactory;
static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
//mq服务主机地址
connectionFactory.setHost("ningbo-3689d402.of-7af93c01.shop");
//连接端口
connectionFactory.setPort(40991);
connectionFactory.setVirtualHost("/my240410");
//设置用户名
connectionFactory.setUsername("allen");
//设置密码
connectionFactory.setPassword("123456");
}
public static Connection getConnection() {
try {
//返回连接 通过工厂获取连接
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
@SpringBootTest
class Demo02MqApplicationTests {
@Test
void contextLoads() {
Connection connection = ConnectionUtils.getConnection();
System.out.println(connection);
}
}
结果:
amqp://allen@110.42.14.112:40991//my240410
* 操作MQ
* 生产者:发送消息
* 消费者:消费信息
*
* 五大消息模型:
* 1、simple消息模型
* 一个生产者、一个消费者、一个队列
* 2、work消息模型
* 一个生产者、多个消费者、一个队列
* 能者多劳:消费者性能高就可以多消费一些信息
* 3、fanout消息模型
* 一个生产者、一个交换机、多个队列、多个消费者
* 怎么实现一个生产者发送消息给多个消费者?
* 生产者发送消息给交换机,交换机发送消息给队列,队列发送消息给消费者
* fanout:广播的意思,发送消息给所有绑定的队列
* 4、direct消息模型
* 一个生产者、一个交换机、多个队列、多个消费者 (队列绑定交换机时需要指定routingKey)
* 5、topic消息模型
* 一个生产者/多个生产者、一个交换机、多个队列、多个消费者 (队列绑定交换机时使用的routingKey是通配符)
* *:通配一级任意多个字符
* #:通配多级任意多个字符
* 例如:user.*:匹配user.add、user.delete
* user.#:匹配user.add、user.delete、user.add.delete
package com.example.demo02.mq.simple;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author Allen
* 4/10/2024 8:07 PM
* @version 1.0
* @description: 简单模型-生产者
*/
public class SimpleSender {
public static void main(String[] args) throws Exception {
// 1、创建连接
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、声明队列:队列可以缓存数据、mq的交换机不能存储数据 simple模型使用的是默认交换机
// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:其他属性
channel.queueDeclare("simple_queue", false, false, false, null);
// 4、发送消息:发送到队列中
// 在simple模型中 参数1:交换机 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容(任意数据类型)
String msg = "hello rabbitmq";
channel.basicPublish("", "simple_queue", null, msg.getBytes());
// 5、关闭通道
channel.close();
// 6、关闭连接
connection.close();
}
}
package com.example.demo02.mq.simple;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/10/2024 8:59 PM
* @version 1.0
* @description: 简单模型-消费者
*/
public class SimpleReciver {
public static void main(String[] args) throws Exception {
// 1、创建连接
Connection connection = ConnectionUtils.getConnection();
// 2、创建通道
Channel channel = connection.createChannel();
// 3、声明队列:如果已声明可以忽略
// 4、监听队列:
channel.queueDeclare("simple_queue", false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
// 消息的具体处理逻辑
// 消费者接收到消息后调用此方法
// 如果AutoAck为true,此方法一旦被调用,消息会被确认
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者接收到消息:"+new String(body));
/*
channel.basicReject(); //拒绝消息,可以丢弃消息,也可以重新入队
channel.basicNack(); //不确认消息,可以设置是否重新入队
*/
channel.basicAck(envelope.getDeliveryTag(),false); //手动确认消息,参数1:消息的标识,参数2:是否开启多个消息同时确认
}
};
/* 参数1:队列名称 参数2:是否自动确认 参数3:消费者
队列名称:消费哪个队列的消息
AutoAck:是否自动确认,如果为true,表示消息一旦被接收,自动向mq回复接收到,mq会删除消息,如果为false,需要手动确认,如果消费者挂掉,消息会被重回队列
Consumer:消费者对象
channel.basicConsume("simple_queue", true,consumer);
避免消息丢失,手动确认
*/
channel.basicConsume("simple_queue", false,consumer);
}
}
Unacked : 消费者用了、但是没有告诉交换机、就是未确认的状态
当消息被消费了,且告诉交换机了,交换机就会将这个消息删除
* work模型:
* 多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次
* 作用:提高消息的消费速度,避免消息的堆积
* 默认采用轮询的方式分发消息
* 如果某个消费者处理消息慢,会导致消息堆积
package com.example.demo02.mq.work;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author Allen
* 4/10/2024 9:37 PM
* @version 1.0
* @description: work模式发送者
*
* work模型:
* 多个消费者消费同一个队列中的消息,每个消费者获取到的消息唯一,且只能消费一次
* 作用:提高消息的消费速度,避免消息的堆积
* 默认采用轮询的方式分发消息
* 如果某个消费者处理消息慢,会导致消息堆积
*/
public class WorkSender {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明队列
// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
channel.queueDeclare("work.queue", false, false, false, null);
// 4:发送100条消息
for (int i = 0; i < 100; i++) {
String msg = "work模式消息" + i;
//休眠i*5毫秒
TimeUnit.MILLISECONDS.sleep(i * 5);
// 参数1:交换机名称 参数2:队列名称 参数3:消息的其他属性 参数4:消息的内容
channel.basicPublish("", "work.queue", null, msg.getBytes());
System.out.println("work模式发送消息:" + msg);
}
// 5:关闭通道
channel.close();
// 6:关闭连接
connection.close();
}
}
(能者多劳角色)
package com.example.demo02.mq.work;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/10/2024 9:37 PM
* @version 1.0
* @description: work模式消费者1号
*/
public class WorkReciver1 {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明队列
// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
channel.queueDeclare("work.queue", false, false, false, null);
// 4:定义消费者,消费消息
// 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象
Consumer consumer = new DefaultConsumer(channel) {
// 消费者接收消息调用此方法
// 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 获取消息
String msg = new String(body);
System.out.println("work模式消费者1号接收消息:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("work.queue", false, consumer);
}
}
(消费能力差)
package com.example.demo02.mq.work;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author Allen
* 4/10/2024 9:37 PM
* @version 1.0
* @description: work模式消费者1号
*/
public class WorkReciver2 {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明队列
// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:队列的属性
channel.queueDeclare("work.queue", false, false, false, null);
//如果此消费者性能较差,配置能者多劳:指定一次获取几条信息,消息消费成功后 ack之后 mq才会发送下一条消息
channel.basicQos(1);
// 4:定义消费者,消费消息
// 参数1:队列名称 参数2:是否自动确认消息 参数3:消费者对象
Consumer consumer = new DefaultConsumer(channel) {
// 消费者接收消息调用此方法
// 参数1:消费者标签 参数2:队列参数 参数3:消息属性 参数4:消息内容
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//模拟二号消费者处理消息慢
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 获取消息:执行业务
String msg = new String(body);
System.out.println("work模式消费者2号接收消息:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 参数1:队列名称 参数2:ACK是否自动确认 参数3:消费者对象
//必须手动确认消息,否则会报406错误
channel.basicConsume("work.queue", false, consumer);
}
}
能者多劳
* 广播模型:
* 一个交换机绑定多个队列
* 每个队列都有一个消费者
* 每个消费者消费自己队列中的消息,每个队列的信息是一样的
package com.example.demo02.mq.fanout;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 8:24 AM
* @version 1.0
* @description: 广播模型发送者
*
* 广播模型:
* 一个交换机绑定多个队列
* 每个队列都有一个消费者
* 每个消费者消费自己队列中的消息,每个队列的信息是一样的
*/
public class FanoutSender {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明交换机
// 参数1:交换机名称 参数2:交换机类型 (fanout direct topic) 参数3:是否持久化
/*
fanout:广播模式
绑定了这个交换机的队列都会收到消息
direct:路由模式
通过路由键完全匹配的队列会收到消息
topic:通配符模式
通过通配符匹配的队列会收到消息
*/
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
// 交换机不会存储消息,只是负责消息的转发,如果没有队列绑定到交换机上,消息会丢失
// 4:发送消息到交换机:需要消费信息的消费者自己声明自己的队列绑定到当前交换机上
String msg = "fanout message";
channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
// 5:关闭通道
channel.close();
// 6:关闭连接
connection.close();
}
}
package com.example.demo02.mq.fanout;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 8:55 AM
* @version 1.0
* @description: 广播模型接收者
*/
public class FanoutReceiver1 {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明交换机
//为什么消费者也得声明交换机?如果消费者先启动,那么交换机还没有声明,消费者就会报错,所以消费者也得声明交换机
// 参数1:交换机名称 参数2:交换机类型 参数3:是否持久化
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
// 4:声明队列
// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:其他参数
channel.queueDeclare("fanout.queue1", false, false, false, null);
// 5:绑定自己的队列到交换机
channel.queueBind("fanout.queue1", "fanout.exchange", "");
// 6:消费消息
Consumer consumer = new DefaultConsumer(channel){
@Override
// 参数1:消费者标签 参数2:消息传递参数 参数3: 参数4:消息内容
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消费消息
System.out.println("Fanout1接收到的消息是:" + new String(body));
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("fanout.queue1",false,consumer);
}
}
package com.example.demo02.mq.fanout;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 8:55 AM
* @version 1.0
* @description: 广播模型接收者
*/
public class FanoutReceiver2 {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明交换机
//为什么消费者也得声明交换机?如果消费者先启动,那么交换机还没有声明,消费者就会报错,所以消费者也得声明交换机
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
// 4:声明队列
// 参数1:队列名称 参数2:是否持久化 参数3:是否排他性 参数4:是否自动删除 参数5:其他参数
channel.queueDeclare("fanout.queue2", false, false, false, null);
// 5:绑定队列到交换机
channel.queueBind("fanout.queue2", "fanout.exchange", "");
// 6:消费消息
Consumer consumer = new DefaultConsumer(channel){
@Override
// 参数1:消费者标签 参数2:消息传递参数 参数3: 参数4:消息内容
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消费消息
System.out.println("Fanout2接收到的消息是:" + new String(body));
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("fanout.queue2",false,consumer);
}
}
当消息很多的时候,需要指定的路由键也会很多,究极复杂。
###### 生产者
package com.example.demo02.mq.direct;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 9:30 AM
* @version 1.0
* @description: 路由模型发送者
*
* 路由模型:
* 一个交换机可以绑定多个队列
* 生产者给交换机发送消息时,需要指定消息的路由键
* 消费者绑定队列到交换机时,需要指定所需要消费的信息的路由键
* 交换机会根据消息的路由键将消息转发到对应的队列
* 缺点:
* 当消息很多的时候,需要指定的路由键也会很多,究极复杂。
*/
public class DirectSender {
public static void main(String[] args) throws Exception {
// 1:创建连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明交换机 参数1:交换机名称 参数2:交换机类型 参数3:是否持久化
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);
// 6:发送消息
String msg1 = "{To DirectReceiver1: orderId:1001}";
String msg2 = "{To DirectReceiver2: orderId:1002}";
// 参数1:交换机 参数2:路由键(与消费者相匹配) 参数3:其他参数 参数4:消息内容
channel.basicPublish("direct.exchange","order.save",null,msg1.getBytes());
channel.basicPublish("direct.exchange","order.update",null,msg2.getBytes());
// 7:关闭通道
channel.close();
// 8:关闭连接
connection.close();
}
}
package com.example.demo02.mq.direct;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 9:44 AM
* @version 1.0
* @description: 路由模型接收者1
*/
public class DirectReceiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);
channel.queueDeclare("direct.queue1", false, false, false, null);
channel.queueBind("direct.queue1","direct.exchange","order.save");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("DirectReceiver1接收到的新增订单消息是:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("direct.queue1",false,consumer);
}
}
package com.example.demo02.mq.direct;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 9:44 AM
* @version 1.0
* @description: 路由模型接收者2
*/
public class DirectReceiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);
channel.queueDeclare("direct.queue2", false, false, false, null);
channel.queueBind("direct.queue2","direct.exchange","order.update");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("DirectReceiver2接收到的修改订单消息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("direct.queue2",false,consumer);
}
}
* 通配符模型
* 生产者必须指定完整且准确的路由key
* 消费者可以使用通配符
* *:可以替代一级的任意字符 add.* ==> add.user add.goods
* #:可以替代多级的任意字符 add.# ==> add.user.name add.user.name.firstName
package com.example.demo02.mq.topic;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 10:15 AM
* @version 1.0
* @description: 通配符模型发送者
* 通配符模型
* 生产者必须指定完整且准确的路由key
* 消费者可以使用通配符
* *:可以替代一级的任意字符 add.* ==> add.user add.goods
* #:可以替代多级的任意字符 add.# ==> add.user.name add.user.name.firstName
*/
public class TopicSender {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明交换机
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
// 4:发送消息 路由Key写法 goods.add 不要使用通配符
String msg1 = "商品新增了,Topic模型,routing key 为 goods.add";
String msg2 = "商品修改了,Topic模型,routing key 为 goods.update";
String msg3 = "商品删除了,Topic模型,routing key 为 goods.delete";
String msg4 = "用户新增了,Topic模型,routing key 为 user.add";
String msg5 = "用户修改了,Topic模型,routing key 为 user.update";
String msg6 = "用户删除了,Topic模型,routing key 为 user.delete";
String msg7 = "添加了用户名字,Topic模型,routing key 为 user.add.name";
String msg8 = "添加了用户年龄,Topic模型,routing key 为 user.add.age";
String msg9 = "修改了用户名字,Topic模型,routing key 为 user.update.name";
String msg10 = "修改了用户年龄,Topic模型,routing key 为 user.update.age";
channel.basicPublish("topic.exchange","goods.add",null,msg1.getBytes());
channel.basicPublish("topic.exchange","goods.update",null,msg2.getBytes());
channel.basicPublish("topic.exchange","goods.delete",null,msg3.getBytes());
channel.basicPublish("topic.exchange","user.add",null,msg4.getBytes());
channel.basicPublish("topic.exchange","user.update",null,msg5.getBytes());
channel.basicPublish("topic.exchange","user.delete",null,msg6.getBytes());
channel.basicPublish("topic.exchange","user.add.name",null,msg7.getBytes());
channel.basicPublish("topic.exchange","user.add.age",null,msg8.getBytes());
channel.basicPublish("topic.exchange","user.update.name",null,msg9.getBytes());
channel.basicPublish("topic.exchange","user.update.age",null,msg10.getBytes());
// 5:关闭连接
channel.close();
connection.close();
}
}
package com.example.demo02.mq.topic;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 10:22 AM
* @version 1.0
*
* @description: 通配符模型接收者1
*/
public class TopicReceiver1 {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明交换机
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
// 4:声明队列
channel.queueDeclare("topic.queue1", false, false, false, null);
// 5:绑定队列到交换机 使用通配符* 一级任意字符 # 多级任意字符
channel.queueBind("topic.queue1", "topic.exchange", "goods.*");
// 6:消费消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("商品模块接收到的消息是:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("topic.queue1",false,consumer);
}
}
package com.example.demo02.mq.topic;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 10:22 AM
* @version 1.0
*
* @description: 通配符模型接收者2
*/
public class TopicReceiver2 {
public static void main(String[] args) throws Exception {
// 1:获取连接
Connection connection = ConnectionUtils.getConnection();
// 2:创建通道
Channel channel = connection.createChannel();
// 3:声明交换机
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,false);
// 4:声明队列
channel.queueDeclare("topic.queue2", false, false, false, null);
// 5:绑定队列到交换机 使用通配符 user.* user.#
channel.queueBind("topic.queue2", "topic.exchange", "user.#");
// 6:消费消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("用户模块接收到的消息是:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("topic.queue2",false,consumer);
}
}
会造成的问题:
生产者发送消息失败导致丢失:
1:没有发送到mq的交换机(交换机不存在、网络通信失败)
2:交换机没有绑定队列(交换机会丢弃信息)
解决:生产者确认回调:
rabbitTemplate可以设置 生产者确认回调
1、消息是否到达交换机回调
2、消息没有到达队列的回调
mq宕机导致丢失:
默认mq 交换机队列 消息没有持久化,mq非法关闭的时候,数据会丢失
声明队列交换机发送信息时都配置持久化
消费者消费消息导致丢失 手动ACK
交换机
队列
都需要持久化
使用了持久化的交换机和消息,宕机之后不会被清除
对交换机发送消息,需要指定持久化数据类型
生产者
// 参数1:交换机名称 参数2:交换机类型 参数3:是否持久化
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,true);
//参数1:交换机名称 参数2:路由key 参数3:消息持久化的数据类型配置 参数4:消息内容
channel.basicPublish("topic.exchange","goods.add", MessageProperties.PERSISTENT_TEXT_PLAIN,msg1.getBytes());
消费者
// 3:声明交换机
channel.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC,true);
使用了持久化的队列 可以保存住队列里面的消息
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.allen</groupId>
<artifactId>demo03-order-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo03-order-provider</name>
<description>demo03-order-provider</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.6.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.allen.demo03.order.Demo03OrderProviderApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.yaml文件
#端口号
server:
port: 8110
spring:
application:
name: order-service
#虚拟机重启之后,rabbitmq服务需要重启 systemctl restart rabbitmq-server
#rabbitmq配置
rabbitmq:
host: ningbo-3689d402.of-7af93c01.shop
port: 40991
username: admin
password: admin
virtual-host: /my240410
#配置mq生产者确认回调 默认是false
publisher-returns: true
#correlated: 使用额外的线程池的线程来调用回调方法
#simple: 使用发布者的线程池的线程来调用回调方法
publisher-confirm-type: correlated
死信交换机
死信队列
死信路由key
* 配置死信交换机、死信队列、绑定关系
* 解释死信队列:
* 1、死信队列:当消息被拒绝、消息过期、队列达到最大长度时,消息会进入死信队列
* 2、死信交换机:死信队列绑定的交换机
* 3、死信路由key:消息进入死信队列时,会携带一个路由key
* 4、死信队列绑定死信交换机
* 5、业务队列绑定死信交换机
* 6、业务队列绑定业务交换机
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("business.exchange",
"stock.bussiness",
"我是测试死信队列,所发送到business的消息!!!");
}
配置发送消息到交换机、队列的回调
也可以配置交换机和队列
package com.allen.demo03.order.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* @author Allen
* 4/11/2024 2:08 PM
* @version 1.0
*/
@Configuration
public class RabbitmqConfig implements InitializingBean {
@Resource
RabbitTemplate rabbitTemplate;
//spring管理bean时提供的生命周期方法,可以在当前类对象属性值初始化之后 容器调用afterPropertiesSet方法
@Override
public void afterPropertiesSet() throws Exception {
//确认消息是否到达交换机的回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if(!ack){
System.out.println("消息没有到达交换机");
System.out.println("消息是否到达交换机:"+ack);
System.out.println(ack?"":"未到达原因:"+cause);
System.out.println("消息:"+correlationData);
}
});
//消息没有到达队列的回调
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{
System.out.println("消息没有到达队列");
System.out.println("消息:"+message);
System.out.println("应答码:"+replyCode);
System.out.println("原因:"+replyText);
System.out.println("交换机:"+exchange);
System.out.println("路由键:"+routingKey);
});
}
//创建交换机对象添加到spring容器中
@Bean
public Exchange payExchange(){
return ExchangeBuilder.topicExchange("pay.exchange")
.ignoreDeclarationExceptions() // 忽略声明异常 避免异常导致程序宕机
.durable(true) // 持久化
.build();
}
//创建队列对象添加到spring容器中
@Bean
public Queue payQueue(){
return QueueBuilder.durable("pay.queue")
.build();
}
//创建绑定对象添加到spring容器中
@Bean
public Binding payBinding(Queue payQueue, Exchange payExchange){
return BindingBuilder.bind(payQueue()) // 绑定队列
.to(payExchange()) // 到交换机
.with("pay.*") // 路由键
.noargs(); // 参数
}
}
package com.example.demo04.stock.consumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author Allen
* 4/11/2024 9:30 PM
* @version 1.0
* @description: 消费者:库存监听器,监听库存消息,进行消费
*/
@Component
public class StockListener {
//监听订单支付成功的消息
//RabbitListener标注的方法,就是一个消费者;会自动监听指定的队列,当有消息到达时,会自动调用该方法进行消费
/* @RabbitListener(
//如果消费的队列其他地方已经创建,可以直接使用队列名称绑定队列进行消费
queues = {"pay.queue"}
)
public void test(String orderToken){
System.out.println("stock服务接收到订单支付成功的消息:"+orderToken);
}*/
/*
springboot整合mq:
消费者默认消息确认方式:
出现异常,消息会重回队列,再次尝试重新消费
如果一直出现异常 会出现死循环消费
*/
//库存服务自定义队列绑定到订单服务的交换机上,进行消费
@RabbitListener(
//如果消费的队列其他地方已经创建,可以直接使用队列名称绑定队列进行消费
bindings = @QueueBinding(
value = @Queue(name = "stock.pay.queue", durable = "true"), //创建队列
//@Exchange:绑定交换机;如果交换机已存在,新创建的交换机必须和已存在的交换机配置一致
exchange = @Exchange(name = "pay.exchange", type = "topic",ignoreDeclarationExceptions = "true"), //绑定交换机
key = {"pay.#"} //绑定路由key
)
)
//参数1:消息体内容 参数2:消息对象 参数3:消息通道
public void paySuccess(String orderToken, Message message, Channel channel) throws IOException {
try {
// int a = 1/0;
System.out.println("stock服务接收到订单支付成功的消息:"+orderToken);
// 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
System.out.println("手动ACK,出现异常");
Boolean flag = message.getMessageProperties().isRedelivered(); //是否是重回队列的消息
if (flag) {
System.out.println("消息已经重回队列且再次消费出现异常被捕获,丢弃信息");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// 除了丢弃消息,还可以绑定死信队列,将消息转发到死信队列
}else {
System.out.println("消息第一次消费出现异常被捕获,重新回到队列中");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
/*
* 手动ACK时:
* 业务方法可能会出现异常,导致消息一致不能正确ACK,一直处于unacked状态,直到消费者进程停止
* 如果是暂时的网络波动导致本次消费异常,过一会网络正常就可以正确消费消息了,但是消息不会重新回到队列中
* 为了避免消费失败的信息一直不能被处理处于unacked状态
* 解决:
* 1:手动ack时有编译时的异常,try catch时可以捕获最大的异常
* 2:当第一次消费失败时进入到了catch中,可以将消息重新回到队列中(unacked 转换为ready),让消费者再次尝试消费
* 3:当第二次消费失败时进入到了catch中,可以丢弃消息(私信队列:丢弃信息的队列如果绑定了死信队列,消息到达死信队列,如果没有绑定消息丢弃)
*
* 如果消息无关紧要,可以丢失
* 重要消息,必须保证消息的可靠性(不丢失)
* 1:生产者确认
* 2:交换机队列消息持久化
* 3:消费者手动ACK
* 手动ACK时,消息重复消费失败会丢弃掉,可以给该队列绑定死信队列
* 私信队列:本质就是一个队列
* 一般会为死信队列创建 交换机 队列 特定路由Key绑定
* 需要使用死信队列的队列,设置额外的参数(死信队列参数,丢弃信息时保存信息的死信配置)
*
* 队列可以配置的参数:mq的控制台,再新建Queue时,可以看到队列可以配置的参数
* x-max-length:队列存储的最大信息长度
* x-max-length-bytes:队列所有信息的总字节数限制
* 以上两个配置 哪个先到达阈值哪个先生效
* x-dead-letter-exchange:死信交换机
* 队列消息丢弃由该交换机处理
* x-dead-letter-routing-key:死信路由key
* 队列消息丢弃 死信交换机会将消息使用该路由key分发
* x-message-ttl:消息的过期时间 单位毫秒
* 如果队列的消息到达设置时间的过期时间仍然未被消费 消息会被丢弃
* 以后队列消息丢弃时的情况:
* 1:代码手动丢弃
* 2:消息队列已满存不下去新的消息时 会被丢弃
* 3:消息队列设置了过期时间,消息过期后会被丢弃
*
*
*
*
* */
//测试死信
//业务队列的消费者
@RabbitListener(queues = "business.queue")
public void businessListener(Message message, Channel channel , String msg) throws IOException {
try {
int a =1 / 0;
//消费消息的业务代码
System.out.println("测试死信队列:所接受到的business队列的消息:"+msg);
//手动ACK 参数1:消息的tag 参数2:是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
if(message.getMessageProperties().isRedelivered()){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
//消息重新回到队列中 参数1:消息的tag 参数2:是否批量确认 参数3:是否重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
}
package com.example.demo04.stock.consumer.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.Map;
/**
* @author Allen
* 4/12/2024 11:27 AM
* @version 1.0
*
* @description: 库存服务的MQ配置类
* 配置死信交换机、死信队列、绑定关系
* 解释死信队列:
* 1、死信队列:当消息被拒绝、消息过期、队列达到最大长度时,消息会进入死信队列
* 2、死信交换机:死信队列绑定的交换机
* 3、死信路由key:消息进入死信队列时,会携带一个路由key
* 4、死信队列绑定死信交换机
* 5、业务队列绑定死信交换机
* 6、业务队列绑定业务交换机
*/
@Configuration
public class StockMqConfig {
//死信
//1、声明死信交换机
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange("dead.exchange")
.ignoreDeclarationExceptions()
.build();
}
//2、声明死信队列
@Bean
public Queue deadQueue(){
return QueueBuilder.durable("dead.queue")
.build();
}
//3、死信队列绑定死信交换机
@Bean
public Binding deadBinding(Exchange deadExchange, Queue deadQueue){
return BindingBuilder.bind(deadQueue)
.to(deadExchange)
.with("dead.msg")
.noargs();
}
//业务
//1、声明业务交换机
@Bean
public Exchange businessExchange(){
return ExchangeBuilder.topicExchange("business.exchange")
.ignoreDeclarationExceptions()
.build();
}
//2、声明业务队列:业务队列绑定死信交换机
@Bean
public Queue businessQueue(){
//以后business.queue队列丢弃消息时,会自动转发到dead.exchange交换机上,路由key为dead.msg
return QueueBuilder.durable("business.queue")
.deadLetterExchange("dead.exchange")
.deadLetterRoutingKey("dead.msg")
.build();
}
//3、业务队列绑定业务交换机
@Bean
public Binding businessBinding(Exchange businessExchange, Queue businessQueue){
return BindingBuilder.bind(businessQueue)
.to(businessExchange)
.with("stock.bussiness")
.noargs();
}
}
重点:交换机根据路由key来转发信息
@Test
void testDelayQueue() {
rabbitTemplate.convertAndSend("buss2.Exchange",
"buss2.msg",
"我是测试延迟队列,所发送到buss2的消息!!!");
}
//延迟队列的消费者 监听器
@RabbitListener(queues = "delay.Queue")
public void delayListener(Message message,Channel channel,String msg) throws IOException {
try {
System.out.println("延迟队列的消费者接收到的消息:"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
if(message.getMessageProperties().isRedelivered()){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
package com.example.demo04.stock.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Allen
* 4/12/2024 2:34 PM
* @version 1.0
* @description: 延迟队列配置
*/
@Configuration
public class DelayMqConfig {
//延迟队列
//1:创建延迟队列 死信队列
@Bean
public Exchange delayExchange() {
return ExchangeBuilder.topicExchange("delay.Exchange")
.ignoreDeclarationExceptions()
.durable(true)
.build();
}
//2:创建死信队列
@Bean
public Queue delayQueue() {
return QueueBuilder
.durable("delay.Queue")
//绑定死信交换机、死信路由key
// .deadLetterExchange("dead.exchange")
// .deadLetterRoutingKey("dead.msg")
.build();
}
@Bean
public Binding delayBinding(Exchange delayExchange,Queue delayQueue) {
return BindingBuilder.bind(delayQueue)
.to(delayExchange)
.with("delay.msg")
.noargs();
}
//业务队列:设置过期时间,并且绑定死信交换机、死信路由key
@Bean
public Exchange buss2Exchange() {
return ExchangeBuilder.topicExchange("buss2.Exchange")
.ignoreDeclarationExceptions()
.durable(true).
build();
}
@Bean
public Queue buss2Queue() {
return QueueBuilder.durable("buss2.Queue")
//利用死信交换机来设置延迟 =====> 延迟交换机
//配置延迟交换机、死信路由key
.deadLetterExchange("delay.Exchange")
.deadLetterRoutingKey("delay.msg")
//设置过期时间,单位毫秒,一旦消息过期,会自动转发到延迟队列(死信队列)
.ttl(1*60*1000)
.build();
}
@Bean
public Binding buss2Binding(Exchange buss2Exchange,Queue buss2Queue) {
return BindingBuilder.bind(buss2Queue)
.to(buss2Exchange)
.with("buss2.msg")
.noargs();
}
}
@Test
void testParam() {
Map map = new HashMap<>();
map.put("orderToken", "123456123");
map.put("userId", "1001");
map.put("amount", 100D);
rabbitTemplate.convertAndSend("pay.exchange",
"param.pay",map
);
}
//接收复杂类型的消息
// 复杂类型会通过JAVA的序列化机制进行序列化,然后再进行传输到队列中
@RabbitListener(bindings = {
@QueueBinding(
value = @org.springframework.amqp.rabbit.annotation.Queue(value = "param.queue", durable = "true"),
exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "pay.exchange", type = ExchangeTypes.TOPIC),
key = {"param.pay"}
)
})
public void paramsListener(Message message , Channel channel, Map map) throws IOException {
System.out.println("接收到的Map类型消息:" + map);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
作用:异步、解耦和、削峰填谷
百分百避免消息丢失,项目性能非常差
(本地消息表:发送消息时,同时将消息作为日志存到本地消息表中 状态设置为 消息发送中、消息成功到达队列、交换机的回调方法中可以设置消息表中消息的状态,最后通过定时任务查找状态为发送失败的信息 重新发送)
一般业务场景:
1:生产者确认
2:持久化
3:消费者确认
手动ack
重新投递
丢弃消息到死信队列(人工干预)
幂等性
因为生产者可能重新发送同一个消息,消费者消费异常时也会重新让消息归队
如果网络通信失败:生产者消费者已经发送,或者,重新让消息归队,或者,消费消息成功但跟mq交互时通信失败,都会导致消息以后再次被消费
和防止表单重复提交类似:
1、生产者发送消息时 可以携带一个token(唯一字符串 可以是一个id、也可以是自己生成的数据)
消费者消费时 验证token在本地数据库表中是否已存在/redis的set中是否已存在
如果已存在 表示消息已经被消费过 本次不处理
如果不存在 将token存到本地数据库表中/redis的string并设置过期时间
2、同一个队列中每个消息都有自己的唯一id
一个队列 一个消费者
消费者不足
1、增加部署消费者服务实例
2、开启能者多劳
一个消费者可以配置一次获取多个消息
开启多线程消费消息,提高并发能力
#能者多劳+多线程
#一次性获取消息的数量
prefetch: 3
#允许并发处理的线程数
concurrency: 3
#最大并发处理的线程数
max-concurrency: 3