目录

SpringBoot整合Kafka

SpringBoot整合Kafka

1.加依赖

org.springframework.kafka spring-kafka

2.加配置

spring: kafka: bootstrap-servers: 192.168.101.129:9092 # Kafka服务器地址 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.启动类加注解@EnableKafka(最好加上)

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; @SpringBootApplication @EnableKafka public class Springbootdemo3Application { public static void main(String[] args) { SpringApplication.run(Springbootdemo3Application.class, args); } }

4 编写生产者和消费者,可以测测看

4.1 生产者

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; @RestController @RequestMapping("/kafka-test") public class KafkaTestController { @Autowired private KafkaTemplate kafkaTemplate; @PostMapping("/send") public String sendMessage(String param) throws ExecutionException, InterruptedException { /**

  • send(String topic, V data)
  • 参数:
  • topic: 要发送消息的主题名称。
  • data: 要发送的消息数据。
  • 说明: 发送消息到指定的主题,不指定分区。
  • send(String topic, K key, V data)
  • 参数:
  • topic: 要发送消息的主题名称。
  • key: 消息的键(key),用于分区选择。
  • data: 要发送的消息数据。
  • 说明: 发送消息到指定的主题,并使用键来选择分区。
  • send(String topic, Integer partition, K key, V data)
  • 参数:
  • topic: 要发送消息的主题名称。
  • partition: 指定要发送到的分区号。
  • key: 消息的键(key),用于分区选择。
  • data: 要发送的消息数据。
  • 说明: 发送消息到指定的主题和分区,并使用键来选择分区。
  • send(Message message)
  • 参数:
  • message: 一个 Message 对象,包含主题、键、分区、消息头和消息体等信息。
  • 说明: 发送一个完整的 Message 对象到 Kafka。
  • sendDefault(V data)
  • 参数:
  • data: 要发送的消息数据。
  • 说明: 发送消息到默认的主题(通过配置 spring.kafka.template.default-topic 指定)。
  • sendDefault(K key, V data)
  • 参数:
  • key: 消息的键(key),用于分区选择。
  • data: 要发送的消息数据。
  • 说明: 发送消息到默认的主题,并使用键来选择分区。
  • sendDefault(Integer partition, K key, V data)
  • 参数:
  • partition: 指定要发送到的分区号。
  • key: 消息的键(key),用于分区选择。
  • data: 要发送的消息数据。
  • 说明: 发送消息到默认的主题和分区,并使用键来选择分区。 */ // 发送消息的主题 String topic = “test-kafka”; // 发送的消息 String message = param; ListenableFuture> send = kafkaTemplate.send(topic, message); SendResult result = send.completable().get(); System.out.println(“消息发送成功: " + result.toString()); return result.toString(); } }

4.2 消费者

import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { /**

  • kafka接收消息
  • topics 是你监听的主题
  • groupId 是你监听的组名
  • @param message */ @KafkaListener(topics = “test-kafka”, groupId = “my-group”) public void listen(String message) { System.out.println(“kafak接收消息 " + message); } }

5.测试成功

https://i-blog.csdnimg.cn/direct/1f157779427f4284920abbeb9dca0bc0.png