Kafka-02 @KafkaListener学习
创始人
2024-11-18 05:08:49
0

一. 引入依赖

SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;

     org.springframework.kafka     spring-kafka     2.8.11  

二. 核心结构

先来看一下 spring-kafka 核心图;

当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;

  • ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;

  • ConcurrentMessageListenerContainer 中有一个属性 List> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;

在这里插入图片描述

三. 核心流程

先来看一下核心方法的调用流程图,略去了部分非核心流程;

执行流程如下:

  1. Spring 启动;
  2. Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
  3. 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
  4. 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
  5. 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
  6. 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
  7. ListenerConsumer 的 run() 被调用;
  8. run 中开启自旋;
  9. 不断调用 kafka-client 提供的 poll() 拉取新的消息;
    • 收到新的消息就执行,执行完了就继续自旋;
    • 收不新消息,重启下一轮自旋;

四. 分析

1. 启动入口

入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();

这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;

// ------------------------------ AbstractApplicationContext ---------------------------- protected void finishRefresh() {    clearResourceCaches();     initLifecycleProcessor();     // 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessor    getLifecycleProcessor().onRefresh();     publishEvent(new ContextRefreshedEvent(this));     if (!NativeDetector.inNativeImage()) {       LiveBeansView.registerApplicationContext(this);    } }    // ------------------------------ DefaultLifecycleProcessor ---------------------------- public void onRefresh() {     startBeans(true);     this.running = true; }    // ------------------------------ DefaultLifecycleProcessor ---------------------------- private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) { 	Lifecycle bean = lifecycleBeans.remove(beanName); 	if (bean != null && bean != this) { 		String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName); 		for (String dependency : dependenciesForBean) { 			doStart(lifecycleBeans, dependency, autoStartupOnly); 		} 		if ((!autoStartupOnly || !(bean instanceof SmartLifecycle) ||               ((SmartLifecycle) bean).isAutoStartup())) { 			try {                 // 获取容器中的 LifeCycle bean 对象,调用它的 start()                 // SpringKafka 中对应的是 KafkaListenerEndpointRegistry                 // 我们重点看一下 KafkaListenerEndpointRegistry.start() 				bean.start(); 			} 			catch (Throwable ex) { 				throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); 			} 		} 	} } 

2. KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;

我们先看它的 start();

// ---------------------------- KafkaListenerEndpointRegistry --------------------------- public void start() {     // 轮询所有的 ConcurrentMessageListenerContainer 对象     // 执行 ConcurrentMessageListenerContainer.start()     for (MessageListenerContainer listenerContainer : getListenerContainers()) {         startIfNecessary(listenerContainer);     }     this.running = true; }    // ---------------------------- KafkaListenerEndpointRegistry --------------------------- private void startIfNecessary(MessageListenerContainer listenerContainer) {     if ((this.contextRefreshed && this.alwaysStartAfterRefresh)          || listenerContainer.isAutoStartup()) {         // 执行 ConcurrentMessageListenerContainer.start()         listenerContainer.start();     } }    // ---------------------------- AbstractMessageListenerContainer --------------------------- public final void start() {     checkGroupId();     synchronized (this.lifecycleMonitor) {         if (!isRunning()) {             // 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()             doStart();         }     } } 

我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;

3. ConcurrentMessageListenerContainer

我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;

// ---------------------------- ConcurrentMessageListenerContainer --------------------------- protected void doStart() {     if (!isRunning()) {         checkTopics();         ContainerProperties containerProperties = getContainerProperties();         TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();         if (topicPartitions != null && this.concurrency > topicPartitions.length) {             this.concurrency = topicPartitions.length;         }         setRunning(true);          // 1. 根据 @KafkaListener 中配置的 concurrency 轮询         for (int i = 0; i < this.concurrency; i++) {             // 2. 创建 KafkaMessageListenerContainer             KafkaMessageListenerContainer container =                 constructContainer(containerProperties, topicPartitions, i);                          // 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置             configureChildContainer(i, container);             if (isPaused()) {                 container.pause();             }                          // 4. 启动 KafkaMessageListenerContainer             container.start();                          // 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中             this.containers.add(container);         }     } } 

关键流程是第 3 步和第 4 步,我们分开来看;

3.1 configureChildContainer()

对刚创建出的 KafkaMessageListenerContainer 做一些配置;

这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;

private void configureChildContainer(int index, KafkaMessageListenerContainer container) {     String beanName = getBeanName();     beanName = (beanName == null ? "consumer" : beanName) + "-" + index;     container.setBeanName(beanName);     ApplicationContext applicationContext = getApplicationContext();     if (applicationContext != null) {         container.setApplicationContext(applicationContext);     }     ApplicationEventPublisher publisher = getApplicationEventPublisher();     if (publisher != null) {         container.setApplicationEventPublisher(publisher);     }      // 设置 clinetIdSuffix,clientId 前缀     container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");     container.setGenericErrorHandler(getGenericErrorHandler());     container.setCommonErrorHandler(getCommonErrorHandler());     container.setAfterRollbackProcessor(getAfterRollbackProcessor());     container.setRecordInterceptor(getRecordInterceptor());     container.setBatchInterceptor(getBatchInterceptor());     container.setInterceptBeforeTx(isInterceptBeforeTx());     container.setListenerInfo(getListenerInfo());     AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();     if (exec == null) {         // 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executors         exec = new SimpleAsyncTaskExecutor(beanName + "-C-");         this.executors.add(exec);                  // 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainer         container.getContainerProperties().setConsumerTaskExecutor(exec);     } } 

3.2 container.start()

调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();

protected void doStart() {     if (isRunning()) {         return;     }     ContainerProperties containerProperties = getContainerProperties();     checkAckMode(containerProperties);      Object messageListener = containerProperties.getMessageListener();     AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();     if (consumerExecutor == null) {         consumerExecutor = new SimpleAsyncTaskExecutor(             (getBeanName() == null ? "" : getBeanName()) + "-C-");         containerProperties.setConsumerTaskExecutor(consumerExecutor);     }     GenericMessageListener listener = (GenericMessageListener) messageListener;     ListenerType listenerType = determineListenerType(listener);          // 1. 创建 ListenerConsumer     // ListenerConsumer 是一个 Runnable 对象     // new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员     // 它的 run() 比较重要     this.listenerConsumer = new ListenerConsumer(listener, listenerType);     setRunning(true);     this.startLatch = new CountDownLatch(1);          // 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用     this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer); } 

ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();

4. ListenerConsumer.run()

我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;

public void run() {     ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());     publishConsumerStartingEvent();     this.consumerThread = Thread.currentThread();     setupSeeks();     KafkaUtils.setConsumerGroupId(this.consumerGroupId);     this.count = 0;     this.last = System.currentTimeMillis();     initAssignedPartitions();     publishConsumerStartedEvent();     Throwable exitThrowable = null;          // 开启自旋     while (isRunning()) {         // 通过 KafkaConsumer 向 kafka-server 发起 poll 请求         pollAndInvoke();     }     wrapUp(exitThrowable); } 

ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;

我们简单看下最终调我们 @KafkaListener 声明方法的地方;

4.1 HandlerAdapter.invoke()

调用到 RecordMessagingMessageListenerAdapter.invoke();

public Object invoke(Message message, Object... providedArgs) throws Exception {    if (this.invokerHandlerMethod != null) {        // 最终的执行入口        // 最后会通过反射调用我们的 @KafkaListener 声明的方法       return this.invokerHandlerMethod.invoke(message, providedArgs);    } else if (this.delegatingHandler.hasDefaultHandler()) {       Object[] args = new Object[providedArgs.length + 1];       args[0] = message.getPayload();       System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);       return this.delegatingHandler.invoke(message, args);    } else {       return this.delegatingHandler.invoke(message, providedArgs);    } } 

至此,SpringKafka 分析完毕;

相关内容

热门资讯

三分钟了解!一起宁德麻将钓蟹怎... 三分钟了解!一起宁德麻将钓蟹怎么赢的几率高,来玩德州竟然是真的有挂,详细教程(有挂详情)1、起透看视...
9分钟辅助!米兔麻将有没有挂,... 9分钟辅助!米兔麻将有没有挂,wepokE真是真的有挂,科技教程(有挂普及)在进入米兔麻将有没有挂辅...
八分钟实锤!白金岛自创房有外 ... 八分钟实锤!白金岛自创房有外 挂吗,wepOke原来真的有挂,高科技教程(有挂教程);1、白金岛自创...
3分钟攻略!全民雀神棋牌提升好... 3分钟攻略!全民雀神棋牌提升好牌概率,pokerx真是是真的有挂,力荐教程(有挂方法)该软件可以轻松...
2分钟发现!吉祥填大坑辅助器免... 2分钟发现!吉祥填大坑辅助器免费下载,epoker总是有挂,技巧教程(有挂规律)1、操作简单,无需注...
两分钟了解!微友辅助器免费版v... 两分钟了解!微友辅助器免费版v3.0,AAPOKER切实真的是有挂,新2025版(有挂解说)运微友辅...
7分钟辅助!潮汕闲来麻将app... 7分钟辅助!潮汕闲来麻将app有挂吗,aapOKER竟然是真的有挂,大神讲解(有挂插件);1、潮汕闲...
七分钟了解!衢州都莱双扣有没有... 七分钟了解!衢州都莱双扣有没有外 挂,aapoKer竟然是有挂,存在挂教程(有挂透视)小薇(透视辅助...
六分钟发现!雀友会广东潮汕麻将... 六分钟发现!雀友会广东潮汕麻将有挂吗,鱼扑克总是真的是有挂,详细教程(有挂秘籍);1)雀友会广东潮汕...
一分钟发现!奇迹陕西棋牌外 挂... 一分钟发现!奇迹陕西棋牌外 挂,aaPOKER竟然真的有挂,黑科技教程(有挂神器)暗藏猫腻,小编详细...