当前位置: 澳门新濠3559 > 编程 > 正文

这已经是,支持消息集群和分布式部署

时间:2019-10-06 23:43来源:编程
队列信息 RabbitMQ消费失败的处理 RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异

澳门新濠3559 1队列信息

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

修改一下消费者的代码:

//接收到消息事件
consumer.Received  = (ch, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);

    Console.WriteLine($"收到消息: {message}");

    Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
    Thread.Sleep(10000);
    //确认该消息已被消费
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};

演示:

澳门新濠3559 2

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

在docker环境部署RabbitMQ

RabbitMQ是用 Erlang 编写的,直接部署的话需要先部署 Erlang 环境,比较麻烦。在 docker 环境下部署就比较简单了,直接使用rabbitmq官方提供的镜像即可。

登录 docker 节点,运行 docker pull rabbitmq:management,这里使用的是带 web 管理插件的镜像。

启动容器:

docker run -d --name rabbitmq --publish 5671:5671 
 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 
rabbitmq:management

澳门新濠3559,容器启动之后就可以访问web 管理端了 http://宿主机IP:15672,默认创建了一个 guest 用户,密码也是 guest

澳门新濠3559 3

二、概念及技术

关于在Docker下安装部署Rabbitmq,可点击Docker:安装部署RabbitMQ。

Fanout Exchange

澳门新濠3559 4

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

static void Main(string[] args)
{
    string exchangeName = "TestFanoutChange";
    string queueName1 = "hello1";
    string queueName2 = "hello2";
    string routeKey = "";

    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用户名
        Password = "admin",//密码
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();

    //定义一个Direct类型交换机
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

    //定义队列1
    channel.QueueDeclare(queueName1, false, false, false, null);
    //定义队列2
    channel.QueueDeclare(queueName2, false, false, false, null);

    //将队列绑定到交换机
    channel.QueueBind(queueName1, exchangeName, routeKey, null);
    channel.QueueBind(queueName2, exchangeName, routeKey, null);

    //生成两个队列的消费者
    ConsumerGenerator(queueName1);
    ConsumerGenerator(queueName2);


    Console.WriteLine($"nRabbitMQ连接成功,nn请输入消息,输入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}

/// <summary>
/// 根据队列名称生成消费者
/// </summary>
/// <param name="queueName"></param>
static void ConsumerGenerator(string queueName)
{
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "admin",//用户名
        Password = "admin",//密码
        HostName = "192.168.157.130"//rabbitmq ip
    };

    //创建连接
    var connection = factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();

    //事件基本消费者
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

    //接收到消息事件
    consumer.Received  = (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body);

        Console.WriteLine($"Queue:{queueName}收到消息: {message}");
        //确认该消息已被消费
        channel.BasicAck(ea.DeliveryTag, false);
    };
    //启动消费者 设置为手动应答消息
    channel.BasicConsume(queueName, false, consumer);
    Console.WriteLine($"Queue:{queueName},消费者已启动");
}

运行:

澳门新濠3559 5

最后

RabbitMQ在 spring cloud 中做为消息总线,负责传递和分发系统消息,是非常重要的一个角色,spring cloud bus 动态加载配置就是使用消息总线,把重新拉去配置的消息分发到各个连接到消息总线的微服务。在 spring cloud steam 的消息驱动模型中同样使用了RabbitMQ。
不仅如此,RabbitMQ本身也是一个非常高效的消息服务器,可以用在服务之间异步调用,以及RPC远程调用(在消息头中增加 Reply Queue 来监听调用返回信息)。

AMQP的设计理念与数据通信网络中的路由协议有些相似。从应用程序角度,AMQP的应用也是服务器/客户端模式。但是在消息队列中,AMQP通过队列的状态决定生产者(Producer)、消费者(Consumer)之间的连接。

  • 个人QQ:499452441
  • 微信公众号:lqdevOps

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

澳门新濠3559 6

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。
由于避免文章过长,影响阅读,所以只贴了部分代码,但是demo里面是完整可运行的,详细代码请查看demo。

AMQP简介

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦和通讯。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,具有很高的易用性和可用性。

[2]

2.关于配置,以上两步就完成了(Springboot真的太方便了,写代码都是愉悦的)。正常使用时,若作为一个消费者,我们会配置一个接收队列,这里为了示例,直接以最小配置来演示下。

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

AMQP协议中的几个重要概念

  • Queue 是RabbitMQ的内部对象,用于存储消息。RabbitMQ中的消息只能存储在 Queue 中,消费者从 Queue 中获取消息并消费。
  • Exchange 生产者将消息发送到 Exchange,由 Exchange 根据一定的规则将消息路由到一个或多个 Queue 中(或者丢弃)。
  • Binding RabbitMQ中通过 BindingExchangeQueue 关联起来。
  • Binding key 在绑定(Binding) ExchangeQueue 的同时,一般会指定一个 binding key
  • Routing key 生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定这个消息的路由规则。 Exchange 会根据 routing keyExchange Type 以及 Binding key 的匹配情况来决定把消息路由到哪个 Queue
  • Exchange Types RabbitMQ常用的Exchange Type有 fanoutdirecttopicheaders 这四种。
    • fanout 这种类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,这时 Routing key 不起作用。
    • direct 这种类型的Exchange路由规则也很简单,它会把消息路由到那些 binding keyrouting key完全匹配的Queue中。
    • topic 这种类型的Exchange的路由规则支持 binding keyrouting key 的模糊匹配,会把消息路由到满足条件的Queue。 binding key 中可以存在两种特殊字符 *#,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配多个单词(可以是零个),单词以 .为分隔符。
    • headers 这种类型的Exchange不依赖于 routing keybinding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

BindingKey由消费者在Binding交换器与消息队列时指定。Routing Key在生产者发送消息时指定。两者的匹配方式由交换类型决定。

2018-07-24 22:59:00.777 INFO 11424 --- [cTaskExecutor-1] c.l.l.springboot.chapter12.Consumer : 接收的消息为: hello,rabbitmq
定义生产者
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "admin",//用户名
    Password = "admin",//密码
    HostName = "192.168.157.130"//rabbitmq ip
};

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//声明一个队列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("nRabbitMQ连接成功,请输入消息,输入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //发布消息
    channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();

使用Spring AMQP收发消息

新建一个maven工程,修改pom.xml引入 spring amqp 依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.2.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

java 目录中创建一个包 demo ,在包中创建启动入口 SpringAmqpApplication.java

public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(SpringAmqpApplication.class, args);
    Sender sender = context.getBean("sender", Sender.class);
    sender.sendMsg("测试Spring AMQP发送消息");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    context.close();
}

@Bean
CachingConnectionFactory myConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setHost("10.47.160.238");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    return connectionFactory;
}

@Bean
Exchange myExchange() {
    return ExchangeBuilder.topicExchange("test.topic").durable().build();
}

@Bean
Queue myQueue() {
    return QueueBuilder.durable("myQueue").build();
}

@Bean
public Binding myExchangeBinding(@Qualifier("myExchange") Exchange topicExchange,
                                 @Qualifier("myQueue") Queue queue) {
    return BindingBuilder.bind(queue).to(topicExchange).with("test.#").noargs();
}

@Bean
public RabbitTemplate myExchangeTemplate(CachingConnectionFactory myConnectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(myConnectionFactory);
    rabbitTemplate.setExchange("test.topic");
    rabbitTemplate.setRoutingKey("test.abc.123");
    return rabbitTemplate;
}

demo 包下创建 Sender.java

@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String content) {
        rabbitTemplate.convertAndSend(content);
        System.out.println("发送消息: '"   content   "'");
    }

}

demo 包下创建Receiver.java

@Component
public class Receiver {

    @RabbitListener(queues = "myQueue")
    public void processMessage(Message message) {
        byte[] body = message.getBody();
        System.out.println("收到消息: '"   new String(body)   "'");
    }

}

回头看下代码,在 SpringAmqpApplication.java 中创建了程序启动入口 main 方法,为了有时间把收到的消息打印出来,让主线程 sleep 了1秒,配置了几个和 RabbitMQ 相关的重要配置:

  • RabbitMQ 的连接 CachingConnectionFactory
  • 创建了一个名为 test.topic 并且类型为 topicExchange
  • 创建了一个名为 myQueueQueue
  • 把上边创建的 QueueExchange 进行了绑定,并指定 binding keytest.#
  • 最后还配置了一个spring封装的模板工具类 rabbitTemplate,指定了 ExchangeRouting key,用这个 rabbitTemplate 发送消息, 会把消息发送到名为 test.topicExchange,并且带有 Routing key test.abc.123

Sender.javaReceiver.java 的代码就比较简单了,在 sendMsg 方法中使用 rabbitTemplate 发送消息。在 processMessage 方法上加了一个 @RabbitListener(queues = "myQueue") 注解,指定从 myQueue 这个队列中获取消息。

运行 main 方法启动工程,可以看到控制台打印出了发送的消息和接收的消息。

demo源码 spring-amqp-demo

Ubuntu下PHP RabbitMQ使用

/** * 简单示例 发送和接收队列消息 * @author oKong * */@RestControllerpublic class DemoController { //AmqpTemplate接口定义了发送和接收消息的基本操作,目前spring官方也只集成了Rabbitmq一个消息队列。。 @Autowired AmqpTemplate rabbitmqTemplate; @GetMapping public String send(String msg) { //发送消息 rabbitmqTemplate.convertAndSend("okong", msg); return "消息:"   msg   ",已发送"; }}

Topic Exchange

澳门新濠3559 7

所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。

string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "admin",//用户名
    Password = "admin",//密码
    HostName = "192.168.157.130"//rabbitmq ip
};

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

//定义队列1
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);



Console.WriteLine($"nRabbitMQ连接成功,nn请输入消息,输入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //发布消息
    channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

运行

澳门新濠3559 8

Demo下载:DotNetCore.RabbitMQ

最后:欢迎加入 .net core 交流群一起学习,群号:4656606 澳门新濠3559 9

交换器(Exchange)接受来自生产者的消息,并根据不同路由算法将消息发送到消息队列。

控制台界面,在Queues标签页,也可以查看到队列okong的消息。

定义消费者
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "admin",//用户名
                Password = "admin",//密码
                HostName = "192.168.157.130"//rabbitmq ip
            };

            //创建连接
            var connection = factory.CreateConnection();
            //创建通道
            var channel = connection.CreateModel();

            //事件基本消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //接收到消息事件
            consumer.Received  = (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine($"收到消息: {message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume("hello", false, consumer);
            Console.WriteLine("消费者已启动");
            Console.ReadKey();
            channel.Dispose();
            connection.Close();

在CentOS上安装RabbitMQ流程

4.访问:, 在控制台就可以看见消费者已经消费到此条消息了:

RabbitMQ常用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

停止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  password
rabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

澳门新濠3559 10

AMQP全称是Advanced MessageQueuing Protocol (高级消息队列协议)。其官方网站在[1]。

个人博客:

RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

1.首先安装erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安装socat

yun install socat

3.最后安装RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

1.协议

 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

.NET Core 使用RabbitMQ

RabbitMQ客户端C 安装详细记录

# rabbitmq相关配置spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest
运行

澳门新濠3559 11

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

交换器与消息队列的关联通过Binding实现。交换器与多个消息队列Binding后会形成一张“路由表”,其中存储的信息包括消息队列的限制条件既Binding Key。交换器收到消息时会解析其Header中的Routing Key,根据交换类型(Exchange Type)将消息路由到消息队列。流程以RabbitMQ官方的简介为例[4]:

配置队列处理类,这里的队列就是上面配置的队列名称。:Consumer.java

Direct Exchange

澳门新濠3559 12

所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

//定义一个队列
channel.QueueDeclare(queueName, false, false, false, null);

//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);

运行:

澳门新濠3559 13

用Python尝试RabbitMQ

完整示例:chapter-12

澳门新濠3559 14

消息队列(Message Queue)在消息没有被消费者消费时将其缓存,当消费者与消息队列连接时,消息队列会把消息转发给消费者。

澳门新濠3559 15公众号

消息(Message)是AMQP通信的基本因素。消息由Header和Body组成。与TCP/IP协议类似,Header包含的是各种属性信息,Body是真正传输的数据。

配置一个名为okong的队列

一、简介

1.application.properties配置加入rabbitmq相关配置。

[4]

题外话:其实Redis也有提供队列功能。但我觉得,redis还是专门用在缓存方面吧。

AMQP的服务器(Broker)主要由交换器、消息、队列组成(有些文献归类为两项:交换器与消息队列)。Broker的主要功能是消息的路由和缓存。在RabbitMQ中,交换器分为持久交换器、临时交换器、自动删除交换器。对于需要保障可靠性的消息,RabbitMQ可以将消息、队列和交换器的数据写入本地硬盘。而对于响应时间敏感的消息,RabbitMQ可以不配置持久化机制。

@Component//@RabbitListener 监听 okong 队列@RabbitListener(queues = "okong")@Slf4jpublic class Consumer { /** * @RabbitHandler 指定消息的处理方法 * @param message */ @RabbitHandler public void process(String message) { log.info("接收的消息为: {}", message); }}

AMQP是一个应用层的异步消息传递协议,为面向消息的中间件而设计。其目的是通过协议使应用模块之间或应用程序与中间件等进行充分解耦。而在设计初期,AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议。现在已经有相当一部分遵循AMQP的服务器和客户端供使用。其中RabbitMQ是AMQP的一款开源标准实现。RabbitMQ的官方网站参见[2].

本章节主要是对RabbitMQ的集成和简单使用进行了说明,对于高并发系统而言,消息队列是一个常见的解决方案了。比如实现异步消息的通知,实现消费者/生产者模式等。由于对rabbitmq没有过多的了解,详细的用法及相关消息队列的知识,可自行搜索相关资料下,这里就不阐述了。前段时间买了本关于RabbitMQ方面的书籍,等看完了,也希望能单独写一篇关于消息队列的文章,敬请期待!

目前互联网上很多大佬都有SpringBoot系列教程,如有雷同,请多多包涵了。本文是作者在电脑前一字一句敲的,每一步都是实践的。若文中有所错误之处,还望提出,谢谢。

澳门新濠3559 16

0.老规矩,加入pom依赖,这已经是Springboot的套路了。

2.Broker模型

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

AMQP现在已经成为OASIS的标准之一。

上节讲了缓存数据库redis的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消息队列RabbitMQ的集成和简单使用示例。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里

编写消息发送类,这里直接写成api方法,方便调试。DemoController.java

[1]

澳门新濠3559 17

AMQP协议分为三层:ModelLayer,规范服务器和Broker行为;Session Layer定义客户端与服务器端Broker的上下文;Transport Layer传输二进制数据流。

关于AMQP:

[3]

3.启动应用,正常配置成功,在Rabbitmq的控制台,是可以看见连接对象的。说明已经正常启动了。

RabbitMQ集群环境生产实例部署

RabbitAutoConfiguration类是其自动加载配置类。

交换类型(Exchange Type)分为Direct(单播,又译为直连式),Topic(组播,又译为主题式),Fanout(广播)。对于Direct,Routing Key必须与BindingKey完全一致时匹配才成功;对于Topic,只要Routing Key符合Binding Key指定的模式,既Binding Key可以为一个匹配模式;而对于Fanout,Routing Key和Binding Key不受任何约束。默认情况下交换类型为Direct。

澳门新濠3559 18第十二章:RabbitMQ的集成和使用

AMQP全称是Advanced MessageQueuing Protocol (高级消息队列协议)。其官方网站在[1]。 AMQP是一个应用层的异步消息传递协议,为面向消息的...

@Configurationpublic class RabbitConfig { /** * 定义一个名为:oKong 的队列 * @return */ @Bean public Queue okongQueue() { return new Queue; }}

推荐阅读:

CentOS 5.6 安装RabbitMQ

编辑:编程 本文来源:这已经是,支持消息集群和分布式部署

关键词: 澳门新濠3559