侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

Spring Boot Kafka概览、配置及优雅地实现发布订阅

2022-06-21 星期二 / 0 评论 / 0 点赞 / 62 阅读 / 7245 字

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 xiaobaiai.net[TOC]1 前言本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 xiaobaiai.net

[TOC]

1 前言

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!

本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于Spring Integration方式。本文内容基于Spring Kafka2.3.3文档Spring Boot Kafka相关文档,Spring创建了一个名为Spring kafka的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,Spring-Kafka项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以spring.kafka.*作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。

spring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=myGroup

实现下面的所涉及到的功能实现,需要有如下环境:

  • Java运行或开发环境(JRE/JDK)
  • Kafka安装成功

更多的配置可以参考《Kafka,ZK集群开发或部署环境搭建及实验》这一篇文章。

.

本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。

.

2 Spring Kafka功能概览

Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):

Spring for Apache KafkaSpring Integration for Apache Kafka Versionkafka-clients
2.3.x3.2.x2.3.1
2.2.x3.1.x2.0.1, 2.1.x, 2.2.x
2.1.x3.0.x1.0.x, 1.1.x, 2.0.0
1.3.x2.3.x0.11.0.x, 1.0.x
.

具体更多版本特点可以看官网,spring kafka当前最新为2.3.4版本。

.

Spring Kafka相关的注解有如下几个:

注解类型描述
EnableKafka启用由AbstractListenerContainerFactory在封面(covers)下创建的Kafka监听器注解端点,用于配置类;
EnableKafkaStreams启用默认的Kafka流组件
KafkaHandler在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解
KafkaListener将方法标记为指定主题上Kafka消息监听器的目标的注解
KafkaListeners聚合多个KafkaListener注解的容器注解
PartitionOffset用于向KafkaListener添加分区/初始偏移信息
TopicPartition用于向KafkaListener添加主题/分区信息

如使用@EnableKafka可以监听AbstractListenerContainerFactory子类目标端点,如ConcurrentKafkaListenerContainerFactoryAbstractKafkaListenerContainerFactory的子类。

public class ConcurrentKafkaListenerContainerFactory<K,V>extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration @EnableKafka public class AppConfig {        @Bean        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();                factory.setConsumerFactory(consumerFactory());                factory.setConcurrency(4);                return factory;        }        // other @Bean definitions }
.

@EnableKafka并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的@EnableKafka。如果想要自己实现Kafka配置类,则需要加上@EnableKafka,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除KafkaAutoConfiguration

.
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")

2.1 自动创建主题

广告 广告

评论区