RocketMQ生產(chǎn)者為什么需要負載均衡?
在RocketMQ中,隊列是消息發(fā)送的基本單位。每個Topic下可能存在多個隊列,因此一個生產(chǎn)者實例可以向不同的隊列發(fā)送消息。當生產(chǎn)者發(fā)送消息時,如果不能均衡的將消息發(fā)送到不同的隊列,那么會導致隊列里的消息分布不均衡,這樣最終會導致消息性能下降,因此生產(chǎn)者負載均衡機制也是非常重要的。
RocketMQ生產(chǎn)者原理分析
既然生產(chǎn)者負載均衡如此重要,我們看下是如何實現(xiàn)的。
我們通常使用如下方法發(fā)送消息:
構(gòu)建消息
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//發(fā)送消息
SendResult sendResult = producer.send(msg);
RocketMQ發(fā)送消息的核心邏輯在DefaultMQProducerImpl類sendDefaultImpl。

在發(fā)送消息流程利里面有一行非常關(guān)鍵的邏輯,selectOneMessageQueue,看方法名稱就可以知道其含義,選擇一個消息隊列。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
里面是通過策略類來實現(xiàn)的。

策略類最終通過org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 實現(xiàn)。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//生產(chǎn)者第一次發(fā)消息
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
//非第一次,重試發(fā)消息的情況,
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//重試的情況,不取上一個broker的隊列
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
第一次發(fā)消息選擇隊列核心邏輯在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()
//線程安全的index
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
public MessageQueue selectOneMessageQueue() {
//獲取一個基礎(chǔ)索引,每次自增1 這個全局存在TopicPublishInfo 每一個topic
int index = this.sendWhichQueue.getAndIncrement();
// 基礎(chǔ)索引和 消息寫隊列大小 進行取模 用來實現(xiàn)輪訓的算法
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
哈哈,這里就是生產(chǎn)者負載均衡輪詢機制的核心邏輯了,使用到了ThreadLocal技術(shù),sendWhichQueue為每個生產(chǎn)者線程維護一個自己的下標索引。
基礎(chǔ)索引計算器,使用ThreadLocal技術(shù)針對不同的生產(chǎn)者線程第一次隨機,后面遞增,可以更加負載均衡。
public class ThreadLocalIndex {
//關(guān)鍵技術(shù)
private final ThreadLocal threadLocalIndex = new ThreadLocal();
private final Random random = new Random();
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
//第一次隨機
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
//第二次索引位置開始自增1
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}
}
哈哈,有沒有覺得這個實現(xiàn)非常巧妙了。不同的生產(chǎn)者線程都擁有自己的索引因子,分配隊列更加均衡。
總結(jié)
本文分析了RocketMQ生產(chǎn)者底層的實現(xiàn),設(shè)計地方有巧妙之處,值得我們學習,上面是發(fā)送非順序消息的場景, 如果是順序消息,我們作為使用者可以指定負載均衡策略。
編輯:黃飛
-
負載均衡
+關(guān)注
關(guān)注
0文章
128瀏覽量
12822 -
線程
+關(guān)注
關(guān)注
0文章
508瀏覽量
20772 -
消息隊列
+關(guān)注
關(guān)注
0文章
34瀏覽量
3245
原文標題:RocketMQ生產(chǎn)者負載均衡(輪詢機制)核心原理
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄

RocketMQ生產(chǎn)者為什么需要負載均衡?
評論