本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 xiaobaiai.net[TOC]1 前言本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看
本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客
微信公众号小白AI
或者网站 xiaobaiai.net
[TOC]
1 前言
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于Spring Integration
方式。本文内容基于Spring Kafka2.3.3文档及Spring Boot Kafka相关文档,Spring创建了一个名为Spring kafka
的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,Spring-Kafka项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以spring.kafka.*
作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。
spring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=myGroup
实现下面的所涉及到的功能实现,需要有如下环境:
- Java运行或开发环境(JRE/JDK)
- Kafka安装成功
更多的配置可以参考《Kafka,ZK集群开发或部署环境搭建及实验》
这一篇文章。
本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。
.2 Spring Kafka功能概览
Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):
Spring for Apache Kafka | Spring Integration for Apache Kafka Version | kafka-clients |
---|---|---|
2.3.x | 3.2.x | 2.3.1 |
2.2.x | 3.1.x | 2.0.1, 2.1.x, 2.2.x |
2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
具体更多版本特点可以看官网,spring kafka当前最新为2.3.4版本。
.Spring Kafka相关的注解有如下几个:
注解类型 | 描述 |
---|---|
EnableKafka | 启用由AbstractListenerContainerFactory 在封面(covers)下创建的Kafka监听器注解端点,用于配置类; |
EnableKafkaStreams | 启用默认的Kafka流组件 |
KafkaHandler | 在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解 |
KafkaListener | 将方法标记为指定主题上Kafka消息监听器的目标的注解 |
KafkaListeners | 聚合多个KafkaListener注解的容器注解 |
PartitionOffset | 用于向KafkaListener添加分区/初始偏移信息 |
TopicPartition | 用于向KafkaListener添加主题/分区信息 |
如使用@EnableKafka
可以监听AbstractListenerContainerFactory
子类目标端点,如ConcurrentKafkaListenerContainerFactory
是AbstractKafkaListenerContainerFactory
的子类。
public class ConcurrentKafkaListenerContainerFactory<K,V>extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }
.@EnableKafka
并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的@EnableKafka
。如果想要自己实现Kafka配置类,则需要加上@EnableKafka
,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除KafkaAutoConfiguration
:
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
2.1 自动创建主题
