博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot整合RabbitMQ
阅读量:4096 次
发布时间:2019-05-25

本文共 7718 字,大约阅读时间需要 25 分钟。

1、新建一个springboot工程

        详情略。

2、pom.xml添加依赖包

org.springframework.boot
spring-boot-starter-amqp
org.projectlombok
lombok

3、application.yml配置

#对于rabbitMQ的支持spring:  rabbitmq:    host: 127.0.0.1    port: 5672       #15672是管理端口    username: guest    password: guest    publisher-confirms: true    #  消息发送到交换机确认机制,是否确认回调server:  port: 8080

4、工程结构

工程结构

5、exchangeConfig配置

package com.zbf.springbootrabbit.config;import org.springframework.amqp.core.DirectExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * Description: * * @author zbf * date: 2018/9/7 10:44 * @version 1.0 */@Configurationpublic class ExchangeConfig {       @Bean    public DirectExchange directExchange() {        DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE, true, false);        return directExchange;    }}

6、QueueConfig配置

package com.zbf.springbootrabbit.config;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * Description: * * @author zbf * date: 2018/9/7 10:45 * @version 1.0 */@Configurationpublic class QueueConfig {    @Bean    public Queue firstQueue() {               return new Queue("first-queue",true,false,false);    }    @Bean    public Queue secondQueue() {        return new Queue("second-queue",true,false,false);    }}

7、RabbitMqConfig配置

package com.zbf.springbootrabbit.config;import com.zbf.springbootrabbit.mqcallback.MsgSendConfirmCallBack;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * Description: * * @author zbf * date: 2018/9/7 10:46 * @version 1.0 */@Configurationpublic class RabbitMqConfig {    /** 消息交换机的名字*/    public static final String EXCHANGE = "exchangeTest";    /** 队列key1*/    public static final String ROUTINGKEY1 = "queue_one_key1";    /** 队列key2*/    public static final String ROUTINGKEY2 = "queue_one_key2";    @Autowired    private QueueConfig queueConfig;    @Autowired    private ExchangeConfig exchangeConfig;    /**     * 连接工厂     */    @Autowired    private ConnectionFactory connectionFactory;    /**     将消息队列1和交换机进行绑定     */    @Bean    public Binding binding_one() {        return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1);    }    /**     * 将消息队列2和交换机进行绑定     */    @Bean    public Binding binding_two() {        return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2);    }    /**     * queue listener  观察 监听模式     * 当有消息到达时会通知监听在对应的队列上的监听对象     * @return     */    @Bean    public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);        simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());        simpleMessageListenerContainer.setExposeListenerChannel(true);        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);        simpleMessageListenerContainer.setConcurrentConsumers(1);        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认        return simpleMessageListenerContainer;    }    /**     * 定义rabbit template用于数据的接收和发送     * @return     */    @Bean    public RabbitTemplate rabbitTemplate() {        RabbitTemplate template = new RabbitTemplate(connectionFactory);        /**若使用confirm-callback或return-callback,         * 必须要配置publisherConfirms或publisherReturns为true         * 每个rabbitTemplate只能有一个confirm-callback和return-callback         */        template.setConfirmCallback(msgSendConfirmCallBack());        //template.setReturnCallback(msgSendReturnCallback());        /**         * 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,         * 可针对每次请求的消息去确定’mandatory’的boolean值,         * 只能在提供’return -callback’时使用,与mandatory互斥         */        //  template.setMandatory(true);        return template;    }    /**     * 消息确认机制     * Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,     * 哪些可能因为broker宕掉或者网络失败的情况而重新发布。     * 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)     * 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。     * @return     */    @Bean    public MsgSendConfirmCallBack msgSendConfirmCallBack(){        return new MsgSendConfirmCallBack();    }}

8、MsgSendConfirmCallBack消息回调

package com.zbf.springbootrabbit.mqcallback;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;/** * Description: * * @author zbf * date: 2018/9/7 10:50 * @version 1.0 */public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        System.out.println("MsgSendConfirmCallBack  , 回调id:" + correlationData);        if (ack) {            System.out.println("消息消费成功");        } else {            System.out.println("消息消费失败:" + cause + "\n重新发送");        }    }}

9、生产者

package com.zbf.springbootrabbit.sender;import com.zbf.springbootrabbit.config.RabbitMqConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * Description: * * @author zbf * date: 2018/9/7 10:52 * @version 1.0 */@Slf4j@Componentpublic class FirstSender {    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * 发送消息     *     * @param uuid     * @param message 消息     */    public void send(String uuid, Object message) {        CorrelationData correlationId = new CorrelationData(uuid);        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2,                message, correlationId);    }}

10、消费者

package com.zbf.springbootrabbit.reciever;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * Description: * * @author zbf * date: 2018/9/7 10:57 * @version 1.0 */@Componentpublic class FirstConsumer {    @RabbitListener(queues = {"first-queue","second-queue"}, containerFactory = "rabbitListenerContainerFactory")    public void handleMessage(String message) throws Exception {        // 处理消息        System.out.println("FirstConsumer {} handleMessage :"+message);    }}

11、测试controller

package com.zbf.springbootrabbit.controllelr;import com.zbf.springbootrabbit.sender.FirstSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.util.UUID;/** * Description: * * @author zbf * date: 2018/9/7 10:58 * @version 1.0 */@RestControllerpublic class SendController {    @Autowired    private FirstSender firstSender;    @GetMapping("/send")    public String send(String message){        String uuid = UUID.randomUUID().toString();        firstSender.send(uuid,message);        return uuid;    }}

接下来,启动项目,浏览器输入zbf,即可看到控制台消费者输出的信息。

 

 

 

 

 

 

你可能感兴趣的文章
【Lua】Mac系统下配置SublimeText的Lua编译环境
查看>>
【C#】利用Conditional属性完成编译忽略
查看>>
【Unity】微信登录后将头像存为bytes,将bytes读取成sprite图片
查看>>
【Unity】使用GPS定位经纬度
查看>>
【UGUI/NGUI】一键换Text/Label字体
查看>>
【C#】身份证本地验证
查看>>
【Unity】坑爹的Bug
查看>>
【算法】求数组中某两个数的和为目标值
查看>>
如何高效学习动态规划?
查看>>
动态规划法(六)鸡蛋掉落问题(一)
查看>>
LeetCode 887.鸡蛋掉落(C++)
查看>>
奇异值分解(SVD)的原理详解及推导
查看>>
算法数据结构 思维导图学习系列(1)- 数据结构 8种数据结构 数组(Array)链表(Linked List)队列(Queue)栈(Stack)树(Tree)散列表(Hash)堆(Heap)图
查看>>
求LCA最近公共祖先的离线Tarjan算法_C++
查看>>
Leetcode 834. 树中距离之和 C++
查看>>
【机器学习】机器学习系统SysML 阅读表
查看>>
最小费用最大流 修改的dijkstra + Ford-Fulksonff算法
查看>>
最小费用流 Bellman-Ford与Dijkstra 模板
查看>>
实现高性能纠删码引擎 | 纠删码技术详解(下)
查看>>
scala(1)----windows环境下安装scala以及idea开发环境下配置scala
查看>>