目录

SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ

1、添加依赖

org.springframework.boot spring-boot-starter-web

org.springframework.boot spring-boot-starter-amqp

org.projectlombok lombok true

2 配置文件 (application.yml)

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /

开启发送确认

publisher-confirm-type: correlated

开启发送失败退回

publisher-returns: true template: mandatory: true

开启ACK

listener: simple: acknowledge-mode: manual prefetch: 1

3 RabbitMQ配置类

// config/RabbitMQConfig.java package com.example.demo.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 交换机名称 public static final String EXCHANGE_NAME = “boot_topic_exchange”; // 队列名称 public static final String QUEUE_NAME = “boot_queue”; // 路由键 public static final String ROUTING_KEY = “boot.#”; // 声明交换机 @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME, true, false); } // 声明队列 @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } // 绑定队列到交换机 @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY); } // 消息转换器 @Bean public Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 配置RabbitTemplate @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); // 设置消息确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println(“消息发送成功:” + (correlationData != null ? correlationData.getId() : “”)); } else { System.out.println(“消息发送失败:” + cause); } }); // 设置消息返回回调 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { System.out.println(“消息丢失:” + new String(message.getBody()) + “, 应答码:” + replyCode + “, 原因:” + replyText + “, 交换机:” + exchange + “, 路由键:” + routingKey); }); return rabbitTemplate; } }

4 消息实体类

package com.example.demo.model; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; @Data @NoArgsConstructor @AllArgsConstructor public class Message implements Serializable { private static final long serialVersionUID = 1L; private String id; private String content; private Date createTime; }

5 消息生产者

package com.example.demo.producer; import com.example.demo.config.RabbitMQConfig; import com.example.demo.model.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; import java.util.UUID; @Slf4j @Component public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; /**

  • 发送消息
  • @param content 消息内容 */ public void sendMessage(String content) { String id = UUID.randomUUID().toString(); Message message = new Message(id, content, new Date()); // 构建correlationData对象,用于做消息可靠性投递 CorrelationData correlationData = new CorrelationData(id); log.info(“发送消息内容: {}”, message); // 发送消息 rabbitTemplate.convertAndSend( RabbitMQConfig.EXCHANGE_NAME, “boot.message”, message, correlationData ); } }

6 消息消费者

package com.example.demo.consumer; import com.example.demo.config.RabbitMQConfig; import com.example.demo.model.Message; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Slf4j @Component public class MessageConsumer { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = RabbitMQConfig.QUEUE_NAME, durable = “true”), exchange = @Exchange(value = RabbitMQConfig.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = RabbitMQConfig.ROUTING_KEY )) public void receiveMessage(@Payload Message message, @Headers Map headers, Channel channel) throws IOException { // 获取消息的唯一标识 long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG); try { // 处理消息 log.info(“接收到消息: {}”, message); // 模拟处理业务逻辑 Thread.sleep(1000); // 确认消息 channel.basicAck(deliveryTag, false); log.info(“消息处理成功,确认消息: {}”, deliveryTag); } catch (Exception e) { log.error(“消息处理失败”, e); // 处理消息失败,进行重新入队或丢弃 // 参数1:消息的唯一标识,参数2:是否批量处理,参数3:是否重新入队 channel.basicNack(deliveryTag, false, true); // 如果要拒绝消息且不重新入队 // channel.basicReject(deliveryTag, false); } } }

7 API控制器

package com.example.demo.controller; import com.example.demo.producer.MessageProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/api/message") public class MessageController { @Autowired private MessageProducer messageProducer; @GetMapping("/send") public String sendMessage(@RequestParam String content) { messageProducer.sendMessage(content); return “消息发送成功: " + content; } }

8 启动类

// DemoApplication.java package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }