SpringBoot整合Kafka
SpringBoot整合Kafka
1.加依赖
org.springframework.kafka spring-kafka
2.加配置
spring: kafka: bootstrap-servers: 192.168.101.129:9092 # Kafka服务器地址 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.启动类加注解@EnableKafka(最好加上)
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; @SpringBootApplication @EnableKafka public class Springbootdemo3Application { public static void main(String[] args) { SpringApplication.run(Springbootdemo3Application.class, args); } }
4 编写生产者和消费者,可以测测看
4.1 生产者
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; @RestController @RequestMapping("/kafka-test") public class KafkaTestController { @Autowired private KafkaTemplate kafkaTemplate; @PostMapping("/send") public String sendMessage(String param) throws ExecutionException, InterruptedException { /**
- send(String topic, V data)
- 参数:
- topic: 要发送消息的主题名称。
- data: 要发送的消息数据。
- 说明: 发送消息到指定的主题,不指定分区。
- send(String topic, K key, V data)
- 参数:
- topic: 要发送消息的主题名称。
- key: 消息的键(key),用于分区选择。
- data: 要发送的消息数据。
- 说明: 发送消息到指定的主题,并使用键来选择分区。
- send(String topic, Integer partition, K key, V data)
- 参数:
- topic: 要发送消息的主题名称。
- partition: 指定要发送到的分区号。
- key: 消息的键(key),用于分区选择。
- data: 要发送的消息数据。
- 说明: 发送消息到指定的主题和分区,并使用键来选择分区。
- send(Message message)
- 参数:
- message: 一个 Message 对象,包含主题、键、分区、消息头和消息体等信息。
- 说明: 发送一个完整的 Message 对象到 Kafka。
- sendDefault(V data)
- 参数:
- data: 要发送的消息数据。
- 说明: 发送消息到默认的主题(通过配置 spring.kafka.template.default-topic 指定)。
- sendDefault(K key, V data)
- 参数:
- key: 消息的键(key),用于分区选择。
- data: 要发送的消息数据。
- 说明: 发送消息到默认的主题,并使用键来选择分区。
- sendDefault(Integer partition, K key, V data)
- 参数:
- partition: 指定要发送到的分区号。
- key: 消息的键(key),用于分区选择。
- data: 要发送的消息数据。
- 说明: 发送消息到默认的主题和分区,并使用键来选择分区。 */ // 发送消息的主题 String topic = “test-kafka”; // 发送的消息 String message = param; ListenableFuture> send = kafkaTemplate.send(topic, message); SendResult result = send.completable().get(); System.out.println(“消息发送成功: " + result.toString()); return result.toString(); } }
4.2 消费者
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { /**
- kafka接收消息
- topics 是你监听的主题
- groupId 是你监听的组名
- @param message */ @KafkaListener(topics = “test-kafka”, groupId = “my-group”) public void listen(String message) { System.out.println(“kafak接收消息 " + message); } }