之前寫過關(guān)于 Apache Pulsar 的簡單示例,用來了解如何使用 Pulsar 這個新生代的消息隊列中間件,但是如果想要在項目中使用,還會欠缺很多,最明顯的就是 集成復(fù)雜,如果你用過其他消息中間件,比如 Kafka、RabbitMq,只需要簡單的引入 jar,就可以通過注解+配置快速集成到項目中。
開始一個 Pulsar Starter
既然已經(jīng)了解了 Apache Pulsar,又認(rèn)識了 spring-boot-starter,今天不妨來看下如何寫一個 pulsar-spring-boot-starter 模塊。
目標(biāo)
寫一個完整的類似 kafka-spring-boot-starter(springboot 項目已經(jīng)集成到 spring-boot-starter 中),需要考慮到很多 kafka 的特性, 今天我們主要實現(xiàn)下面幾個模板
- 在項目中夠通過引入 jar 依賴快速集成
- 提供統(tǒng)一的配置入口
- 能夠快速發(fā)送消息
- 能夠基于注解實現(xiàn)消息的消費
定義結(jié)構(gòu)
└── pulsar-starter
├── pulsar-spring-boot-starter
├── pulsar-spring-boot-autoconfigure
├── spring-pulsar
├── spring-pulsar-xx
├── spring-pulsar-sample
└── README.md
整個模塊的結(jié)構(gòu)如上其中pulsar-starter作為一個根模塊,主要控制子模塊依賴的其他 jar 的版本以及使用到的插件版本。類似于 Spring-Bom,這樣我們在后續(xù)升級 時,就可以解決各個第三方 jar 的可能存在版本沖突導(dǎo)致的問題。
- pulsar-spring-boot-starter
該模塊作為外部項目集成的直接引用 jar,可以認(rèn)為是 pulsar-spring-boot-starter 組件的入口,里面不需要寫任何代碼,只需要引入需要的依賴(也就是下面的子模塊)即可
- pulsar-spring-boot-autoconfigure
該模塊主要定義了 spring.factories 以及 AutoConfigure、Properties。也就是自動配置的核心(配置項+Bean 配置)
- spring-pulsar
該模塊是核心模塊,主要的實現(xiàn)都在這里
- spring-pulsar-xx
擴展模塊,可以對 spring-pulsar 做更細(xì)化的劃分
- spring-pulsar-sample
starter 的使用示例項目
實現(xiàn)
上面我們說到實現(xiàn)目標(biāo),現(xiàn)在看下各個模塊應(yīng)該包含什么內(nèi)容,以及怎么實現(xiàn)我們的目標(biāo)
- 入口 pulsar-spring-boot-starter
上面說到 starter 主要是引入整個模塊基礎(chǔ)的依賴即可,里面不用寫代碼。
< dependencies >
< dependency >
< groupId >com.sucl< /groupId >
< artifactId >spring-pulsar< /artifactId >
< version >${project.version}< /version >
< /dependency >
< dependency >
< groupId >com.sucl< /groupId >
< artifactId >pulsar-spring-boot-autoconfigure< /artifactId >
< version >${project.version}< /version >
< /dependency >
< /dependencies >
- pulsar-spring-boot-autoconfigure
- 添加 spring-boot 基礎(chǔ)的配置
< dependencies >
< dependency >
< groupId >org.springframework.boot< /groupId >
< artifactId >spring-boot< /artifactId >
< /dependency >
< dependency >
< groupId >org.springframework.boot< /groupId >
< artifactId >spring-boot-starter-logging< /artifactId >
< /dependency >
< dependency >
< groupId >org.springframework.boot< /groupId >
< artifactId >spring-boot-configuration-processor< /artifactId >
< optional >true< /optional >
< /dependency >
< /dependencies >
- 定義自動配置類 PulsarAutoConfiguration :
- 引入 Properties ,基于EnableConfigurationProperties與spring-boot-configuration-processor解析 Properties 生成對應(yīng)spring-configuration-metadata.json文件,這樣編寫 application.yml 配置時就可以自動提示配置項的屬性和值了。
- 構(gòu)建一些必須的 Bean,如 PulsarClient、ConsumerFactory、ConsumerFactory 等
- Import 配置 PulsarAnnotationDrivenConfiguration,這個主要是一些額外的配置,用來支持后面的功能
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
@Import({PulsarAnnotationDrivenConfiguration.class})
public class PulsarAutoConfiguration {
private final PulsarProperties properties;
public PulsarAutoConfiguration(PulsarProperties properties) {
this.properties = properties;
}
@Bean(destroyMethod = "close")
public PulsarClient pulsarClient() {
ClientBuilder clientBuilder = new ClientBuilderImpl(properties);
return clientBuilder.build();
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory pulsarConsumerFactory() {
return new DefaultPulsarConsumerFactory(pulsarClient(), properties.getConsumer().buildProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory pulsarProducerFactory() {
return new DefaultPulsarProducerFactory(pulsarClient(), properties.getProducer().buildProperties());
}
}
- 配置 spring.factory
在目錄src/main/resources/META-INF下創(chuàng)建 spring.factories ,內(nèi)容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.sucl.pulsar.autoconfigure.PulsarAutoConfiguration
- spring-pulsar
- 添加 pulsar-client 相關(guān)的依賴
< dependencies >
< dependency >
< groupId >org.apache.pulsar< /groupId >
< artifactId >pulsar-client< /artifactId >
< /dependency >
< dependency >
< groupId >org.springframework.boot< /groupId >
< artifactId >spring-boot-autoconfigure< /artifactId >
< /dependency >
< dependency >
< groupId >org.springframework< /groupId >
< artifactId >spring-messaging< /artifactId >
< /dependency >
< /dependencies >
- 定義 EnablePulsar,之前說到過,@Enable 注解主要是配合 AutoConfigure 來做功能加強,沒有了自動配置,我們依然可以使用這些模塊的功能。這里做了一件事,向 Spring 容器注冊了兩個 Bean
- PulsarListenerAnnotationBeanProcessor 在 Spring Bean 生命周期中解析注解自定義注解 PulsarListener、PulsarHandler,
- PulsarListenerEndpointRegistry 用來構(gòu)建 Consumer 執(zhí)行環(huán)境以及對 TOPIC 的監(jiān)聽、觸發(fā)消費回調(diào)等等,可以說是最核心的 Bean
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({PulsarListenerConfigurationSelector.class})
public @interface EnablePulsar {
}
- 定義注解,參考 RabbitMq,主要針對需要關(guān)注的類與方法,分別對應(yīng)注解@PulsarListener、@PulsarHandler,通過這兩個注解配合可以讓我們監(jiān)聽到關(guān)注的 TOPIC, 當(dāng)有消息產(chǎn)生時,觸發(fā)對應(yīng)的方法進(jìn)行消費。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarListener {
/**
*
* @return TOPIC 支持SPEL
*/
String[] topics() default {};
/**
*
* @return TAGS 支持SPEL
*/
String[] tags() default {};
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface PulsarHandler {
}
- 注解@PulsarListener 的處理流程比較復(fù)雜,這里用一張圖描述,或者可以通過下面 github 的源代碼查看具體實現(xiàn)
flow
- spring-pulsar-sample
按照下面的流程,你會發(fā)現(xiàn)通過簡單的幾行代碼就能夠?qū)崿F(xiàn)消息的生產(chǎn)與消費,并集成到項目中去。
- 簡單寫一個 SpringBoot 項目,并添加 pulsar-spring-boot-starter
< dependencies >
< dependency >
< groupId >com.sucl< /groupId >
< artifactId >pulsar-spring-boot-starter< /artifactId >
< version >${project.version}< /version >
< /dependency >
< dependency >
< groupId >org.springframework.boot< /groupId >
< artifactId >spring-boot-starter-web< /artifactId >
< /dependency >
< /dependencies >
- 添加配置
cycads:
pulsar:
service-url: pulsar://localhost:6650
listener-topics: TOPIC_TEST
- 編寫對應(yīng)消費代碼
@Slf4j
@Component
@PulsarListener(topics = "#{'${cycads.listener-topics}'.split(',')}")
public class PulsarDemoListener {
@PulsarHandler
public void onConsumer(Message message){
log.info(" >> > 接收到消息:{}", message.getPayload());
}
}
- 向 Pulsar Broker 發(fā)送消息進(jìn)行測試
@Slf4j
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {ContextConfig.class})
@Import({PulsarAutoConfiguration.class})
public class ProducerTests {
@Autowired
private ProducerFactory producerFactory;
@Test
public void sendMessage() {
Producer producer = producerFactory.createProducer("TOPIC_TEST");
MessageId messageId = producer.send("this is a test message");
log.info(" >> >> >> > 消息發(fā)送完成:{}", messageId);
}
@Configuration
@PropertySource(value = "classpath:application-test.properties")
static class ContextConfig {
//
}
}
- 控制臺可以看到這樣的結(jié)果
2023-02-26 19:57:15.572 INFO 26520 --- [pulsar-01] c.s.p.s.listener.PulsarDemoListener : > >> 接收到消息:GenericMessage [payload=this is a test message, headers={id=f861488c-2afb-b2e7-21a1-f15e9759eec5, timestamp=1677412635571}]
知識點
- Pulsar Client
基于 pulsar-client 提供的 ConfigurationData 擴展 Properties;了解 Pulsar Client 如何連接 Broker 并進(jìn)行消息消費,包括同步消費、異步消費等等
- spring.factories
實現(xiàn) starter 自動配置的關(guān)鍵,基于 SPI 完成配置的自動加載
- Spring Bean 生命周期
通過 Bean 生命周期相關(guān)擴展實現(xiàn)注解的解析與容器的啟動,比如 BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton, InitializingBean, DisposableBean 等
- Spring Messaging
基于回調(diào)與 MethodHandler 實現(xiàn)消息體的封裝、參數(shù)解析以及方法調(diào)用;
源碼示例
https://github.com/sucls/pulsar-starter.git
結(jié)束語
如果你看過 spring-kafka 的源代碼,那么你會發(fā)現(xiàn)所有代碼基本都是仿造其實現(xiàn)。一方面能夠閱讀 kafka client 在 spring 具體如何實現(xiàn);同時通過編寫自己的 spring starter 模塊,學(xué)習(xí) 整個 starter 的實現(xiàn)過程。
-
模塊
+關(guān)注
關(guān)注
7文章
2811瀏覽量
52279 -
模板
+關(guān)注
關(guān)注
0文章
110瀏覽量
20971 -
代碼
+關(guān)注
關(guān)注
30文章
4921瀏覽量
72204 -
spring
+關(guān)注
關(guān)注
0文章
341瀏覽量
15566 -
kafka
+關(guān)注
關(guān)注
0文章
54瀏覽量
5486
發(fā)布評論請先 登錄
Spring Boot如何實現(xiàn)異步任務(wù)
Spring Boot Starter需要些什么

Spring狀態(tài)機的實現(xiàn)原理和使用方法

java spring教程
什么是java spring
Spring筆記分享
Kafka集群環(huán)境的搭建
「Spring認(rèn)證」什么是Spring GraphQL?

Kafka的概念及Kafka的宕機

Spring Boot實現(xiàn)各種參數(shù)校驗
Kafka 的簡介

物通博聯(lián)5G-kafka工業(yè)網(wǎng)關(guān)實現(xiàn)kafka協(xié)議對接到云平臺
Spring Kafka的各種用法
Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶端API設(shè)計

如何將Kafka使用到我們的后端設(shè)計中

評論