消息隊列使用場景 為什么會需要消息隊列(MQ)?
解耦
在項目啟動之初來預(yù)測將來項目會碰到什么需求,是極其困難的。消息系統(tǒng)在處理過程中間插入了一個隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實現(xiàn)這一接口。這允許你獨立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
冗余
有些情況下,處理數(shù)據(jù)的過程會失敗。除非數(shù)據(jù)被持久化,否則將造成丟失。消息隊列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險。許多消息隊列所采用的”插入-獲取-刪除”范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
擴(kuò)展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡單。
靈活性 & 峰值處理能力
在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見;如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負(fù)荷的請求而完全崩潰。
可恢復(fù)性
系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進(jìn)程間的耦合度,所以即使一個處理消息的進(jìn)程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
順序保證
在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。Kafka保證一個Partition內(nèi)的消息的有序性。
緩沖
在任何重要的系統(tǒng)中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應(yīng)用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務(wù)最高效率的執(zhí)行———寫入隊列的處理會盡可能的快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。
異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
MQ常用的使用場景:
1. 進(jìn)程間通訊和系統(tǒng)間的消息通知,比如在分布式系統(tǒng)中。
2. 解耦,比如像我們公司有許多開發(fā)團(tuán)隊,每個團(tuán)隊負(fù)責(zé)業(yè)務(wù)的不同模塊,各個開發(fā)團(tuán)隊可以使用MQ來通信。
3. 在一些高并發(fā)場景下,使用MQ的異步特性。
消息隊列和RPC對比 系統(tǒng)架構(gòu)
RPC系統(tǒng)結(jié)構(gòu):
+----------++----------+|Consumer|<=>|Provider|+----------++----------+
Consumer調(diào)用的Provider提供的服務(wù)。
Message Queue系統(tǒng)結(jié)構(gòu):
+--------++-------++----------+|Sender|<=>|Queue|<=>|Receiver|+--------++-------++----------+
Sender發(fā)送消息給Queue;Receiver從Queue拿到消息來處理。
功能特點
在架構(gòu)上,RPC和Message Queue的差異點是,Message Queue有一個中間結(jié)點Message Queue(broker),可以把消息存儲。
消息隊列的特點
Message Queue把請求的壓力保存一下,逐漸釋放出來,讓處理者按照自己的節(jié)奏來處理。
Message Queue引入一下新的結(jié)點,讓系統(tǒng)的可靠性會受Message Queue結(jié)點的影響。
Message Queue是異步單向的消息。發(fā)送消息設(shè)計成是不需要等待消息處理的完成。
所以對于有同步返回需求,用Message Queue則變得麻煩了。
RPC的特點
同步調(diào)用,對于要等待返回結(jié)果/處理結(jié)果的場景,RPC是可以非常自然直覺的使用方式。RPC也可以是異步調(diào)用。
由于等待結(jié)果,Consumer(Client)會有線程消耗。
如果以異步RPC的方式使用,Consumer(Client)線程消耗可以去掉。但不能做到像消息一樣暫存消息/請求,壓力會直接傳導(dǎo)到服務(wù)Provider。
RPC適用場合說明
希望同步得到結(jié)果的場合,RPC合適。
希望使用簡單,則RPC;RPC操作基于接口,使用簡單,使用方式模擬本地調(diào)用。異步的方式編程比較復(fù)雜。
不希望發(fā)送端(RPC Consumer、Message Sender)受限于處理端(RPC Provider、Message Receiver)的速度時,使用Message Queue。
隨著業(yè)務(wù)增長,有的處理端處理量會成為瓶頸,會進(jìn)行同步調(diào)用到異步消息的改造。這樣的改造實際上有調(diào)整業(yè)務(wù)的使用方式。
比如原來一個操作頁面提交后就下一個頁面會看到處理結(jié)果;改造后異步消息后,下一個頁面就會變成“操作已提交,完成后會得到通知”。
RPC不適用場合說明
RPC同步調(diào)用使用Message Queue來傳輸調(diào)用信息。 上面分析可以知道,這樣的做法,發(fā)送端是在等待,同時占用一個中間點的資源。變得復(fù)雜了,但沒有對等的收益。
對于返回值是void的調(diào)用,可以這樣做,因為實際上這個調(diào)用業(yè)務(wù)上往往不需要同步得到處理結(jié)果的,只要保證會處理即可。(RPC的方式可以保證調(diào)用返回即處理完成,使用消息方式后這一點不能保證了。)
返回值是void的調(diào)用,使用消息,效果上是把消息的使用方式Wrap成了服務(wù)調(diào)用(服務(wù)調(diào)用使用方式成簡單,基于業(yè)務(wù)接口)。
常用的消息隊列及使用場景 ActiveMQ
AcitveMQ是作為一種消息存儲和分發(fā)組件,涉及到client與broker端數(shù)據(jù)交互的方方面面,它不僅要擔(dān)保消息的存儲安全性,還要提供額外的手段來確保消息的分發(fā)是可靠的。
ActiveMQ消息傳送機制
Producer客戶端使用來發(fā)送消息的, Consumer客戶端用來消費消息;它們的協(xié)同中心就是ActiveMQ broker,broker也是讓producer和consumer調(diào)用過程解耦的工具,最終實現(xiàn)了異步RPC/數(shù)據(jù)交換的功能。隨著ActiveMQ的不斷發(fā)展,支持了越來越多的特性,也解決開發(fā)者在各種場景下使用ActiveMQ的需求。比如producer支持異步調(diào)用;使用flow control機制讓broker協(xié)同consumer的消費速率;consumer端可以使用prefetchACK來最大化消息消費的速率;提供”重發(fā)策略”等來提高消息的安全性等。在此我們不詳細(xì)介紹。
一條消息的生命周期如下:
?
圖片中簡單的描述了一條消息的生命周期,不過在不同的架構(gòu)環(huán)境中,message的流動行可能更加復(fù)雜.將在稍后有關(guān)broker的架構(gòu)中詳解..一條消息從producer端發(fā)出之后,一旦被broker正確保存,那么它將會被consumer消費,然后ACK,broker端才會刪除;不過當(dāng)消息過期或者存儲設(shè)備溢出時,也會終結(jié)它。
ActiveMQ的安裝
啟動后,activeMQ會占用兩個端口,一個是負(fù)責(zé)接收發(fā)送消息的tcp端口:61616,一個是基于web負(fù)責(zé)用戶界面化管理的端口:8161。這兩個端口可以在conf下面的xml中找到。http服務(wù)器使用了jettry。這里有個問題是啟動mq后,很長時間管理界面才可以顯示出來。可以使用netstat -an|find “61616”來測試ActiveMQ是否啟動。
Jms與ActiveMQ的結(jié)合
JMS是一個用于提供消息服務(wù)的技術(shù)規(guī)范,它制定了在整個消息服務(wù)提供過程中的所有數(shù)據(jù)結(jié)構(gòu)和交互流程。而MQ則是消息隊列服務(wù),是面向消息中間件(MOM)的最終實現(xiàn),是真正的服務(wù)提供者;MQ的實現(xiàn)可以基于JMS,也可以基于其他規(guī)范或標(biāo)準(zhǔn)。目前選擇的最多的是ActiveMQ。
JMS支持兩種消息傳遞模型:點對點(point-to-point,簡稱PTP)和發(fā)布/訂閱(publish/subscribe,簡稱pub/sub)。這兩種消息傳遞模型非常相似,但有以下區(qū)別:
PTP消息傳遞模型規(guī)定了一條消息之恩能夠傳遞費一個接收方。
Pub/sub消息傳遞模型允許一條消息傳遞給多個接收方
點對點模型
通過點對點的消息傳遞模型,一個應(yīng)用程序可以向另外一個應(yīng)用程序發(fā)送消息。在此傳遞模型中,目標(biāo)類型是隊列。消息首先被傳送至隊列目標(biāo),然后從該隊列將消息傳送至對此隊列進(jìn)行監(jiān)聽的某個消費者,如下圖:
?
一個隊列可以關(guān)聯(lián)多個隊列發(fā)送方和接收方,但一條消息僅傳遞給一個接收方。如果多個接收方正在監(jiān)聽隊列上的消息,JMS Provider將根據(jù)“先來者優(yōu)先”的原則確定由哪個價售房接受下一條消息。如果沒有接收方在監(jiān)聽隊列,消息將保留在隊列中,直至接收方連接到隊列為止。這種消息傳遞模型是傳統(tǒng)意義上的拉模型或輪詢模型。在此列模型中,消息不時自動推動給客戶端的,而是要由客戶端從隊列中請求獲得。
點對點模型的代碼(springboot+jms+activemq)實現(xiàn)如下:
@Service("queueproducer")publicclassQueueProducer{@Autowired// 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進(jìn)行了封裝privateJmsMessagingTemplate jmsMessagingTemplate;// 發(fā)送消息,destination是發(fā)送到的隊列,message是待發(fā)送的消息@Scheduled(fixedDelay=3000)//每3s執(zhí)行1次publicvoidsendMessage(Destination destination,finalString message){ jmsMessagingTemplate.convertAndSend(destination, message); }@JmsListener(destination="out.queue")publicvoidconsumerMessage(String text){ System.out.println("從out.queue隊列收到的回復(fù)報文為:"+text); } }
Producer的實現(xiàn)
@ComponentpublicclassQueueConsumer2{// 使用JmsListener配置消費者監(jiān)聽的隊列,其中text是接收到的消息@JmsListener(destination ="mytest.queue")//SendTo 該注解的意思是將return回的值,再發(fā)送的"out.queue"隊列中@SendTo("out.queue")publicStringreceiveQueue(String text) { System.out.println("QueueConsumer2收到的報文為:"+text);return"return message "+text; } }
Consumer的實現(xiàn)
@RunWith(SpringRunner.class)@SpringBootTestpublicclassActivemqQueueTests{@AutowiredprivateQueueProducer producer;@TestpublicvoidcontextLoads()throwsInterruptedException { Destination destination =newActiveMQQueue("mytest.queue");for(inti=0; i<10; i++){ producer.sendMessage(destination,"myname is Flytiger"+ i); } } }
Test的實現(xiàn)
其中QueueConsumer2表明的是一個雙向隊列。
發(fā)布/訂閱模型
通過發(fā)布/訂閱消息傳遞模型,應(yīng)用程序能夠?qū)⒁粭l消息發(fā)送到多個接收方。在此傳送模型中,目標(biāo)類型是主題。消息首先被傳送至主題目標(biāo),然后傳送至所有已訂閱此主題的或送消費者。如下圖:
?
主題目標(biāo)也支持長期訂閱。長期訂閱表示消費者已注冊了主題目標(biāo),但在消息到達(dá)目標(biāo)時該消費者可以處于非活動狀態(tài)。當(dāng)消費者再次處于活動狀態(tài)時,將會接收該消息。如果消費者均沒有注冊某個主題目標(biāo),該主題只保留注冊了長期訂閱的非活動消費者的消息。與PTP消息傳遞模型不同,pub/sub消息傳遞模型允許多個主題訂閱者接收同一條消息。JMS一直保留消息,直至所有主題訂閱者都接收到消息為止。pub/sub消息傳遞模型基本上是一個推模型。在該模型中,消息會自動廣播,消費者無須通過主動請求或輪詢主題的方法來獲得新的消息。
上面兩種消息傳遞模型里,我們都需要定義消息生產(chǎn)者和消費者,生產(chǎn)者把消息發(fā)送到JMS Provider的某個目標(biāo)地址(Destination),消息從該目標(biāo)地址傳送至消費者。消費者可以同步或異步接收消息,一般而言,異步消息消費者的執(zhí)行和伸縮性都優(yōu)于同步消息接收者,體現(xiàn)在:
1. 異步消息接收者創(chuàng)建的網(wǎng)絡(luò)流量比較小。單向?qū)|消息,并使之通過管道進(jìn)入消息監(jiān)聽器。管道操作支持將多條消息聚合為一個網(wǎng)絡(luò)調(diào)用。
2. 異步消息接收者使用線程比較少。異步消息接收者在不活動期間不使用線程。同步消息接收者在接收調(diào)用期間內(nèi)使用線程,結(jié)果線程可能會長時間保持空閑,尤其是如果該調(diào)用中指定了阻塞超時。
3. 對于服務(wù)器上運行的應(yīng)用程序代碼,使用異步消息接收者幾乎總是最佳選擇,尤其是通過消息驅(qū)動Bean。使用異步消息接收者可以防止應(yīng)用程序代碼在服務(wù)器上執(zhí)行阻塞操作。而阻塞操作會是服務(wù)器端線程空閑,甚至?xí)?dǎo)致死鎖。阻塞操作使用所有線程時則發(fā)生死鎖。如果沒有空余的線程可以處理阻塞操作自身解鎖所需的操作,這該操作永遠(yuǎn)無法停止阻塞。
發(fā)布/訂閱模型的代碼(springboot+jms+activemq)實現(xiàn)如下:
@Service("topicproducer")publicclassTopicProducer{@Autowired// 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進(jìn)行了封裝privateJmsMessagingTemplate jmsMessagingTemplate;// 發(fā)送消息,destination是發(fā)送到的隊列,message是待發(fā)送的消息@Scheduled(fixedDelay=3000)//每3s執(zhí)行1次publicvoidsendMessage(Destination destination,finalString message){ jmsMessagingTemplate.convertAndSend(destination, message); } }
Producer的實現(xiàn)
@ComponentpublicclassTopicConsumer2 {// 使用JmsListener配置消費者監(jiān)聽的隊列,其中text是接收到的消息@JmsListener(destination ="mytest.topic")publicvoidreceiveTopic(String text) { System.out.println("TopicConsumer2收到的topic報文為:"+text); } }
Consumer的實現(xiàn)
@RunWith(SpringRunner.class)@SpringBootTestpublicclassActivemqTopicTests{@AutowiredprivateTopicProducer producer;@TestpublicvoidcontextLoads()throwsInterruptedException { Destination destination =newActiveMQTopic("mytest.topic");for(inti=0; i<3; i++){ producer.sendMessage(destination,"myname is TopicFlytiger"+ i); } } }
Test的實現(xiàn)
Topic模式工作時,默認(rèn)只能發(fā)送和接收queue消息,如果要發(fā)送和接收topic消息,需要加入:
spring.jms.pub-sub-domain=true Queue與Topic的比較
JMS Queue執(zhí)行l(wèi)oad balancer語義
一條消息僅能被一個consumer收到。如果在message發(fā)送的時候沒有可用的consumer,那么它講被保存一直到能處理該message的consumer可用。如果一個consumer收到一條message后卻不響應(yīng)它,那么這條消息將被轉(zhuǎn)到另外一個consumer那兒。一個Queue可以有很多consumer,并且在多個可用的consumer中負(fù)載均衡。
Topic實現(xiàn)publish和subscribe語義
一條消息被publish時,他將發(fā)送給所有感興趣的訂閱者,所以零到多個subscriber將接收到消息的一個拷貝。但是在消息代理接收到消息時,只有激活訂閱的subscriber能夠獲得消息的一個拷貝。
分別對應(yīng)兩種消息模式
Point-to-Point(點對點),Publisher/Subscriber Model(發(fā)布/訂閱者)
其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化訂閱)和durable subscription(持久化訂閱)兩種消息處理方式。
ActiveMQ優(yōu)缺點
優(yōu)點:是一個快速的開源消息組件(框架),支持集群,同等網(wǎng)絡(luò),自動檢測,TCP,SSL,廣播,持久化,XA,和J2EE1.4容器無縫結(jié)合,并且支持輕量級容器和大多數(shù)跨語言客戶端上的Java虛擬機。消息異步接受,減少軟件多系統(tǒng)集成的耦合度。消息可靠接收,確保消息在中間件可靠保存,多個消息也可以組成原子事務(wù)。
缺點:ActiveMQ默認(rèn)的配置性能偏低,需要優(yōu)化配置,但是配置文件復(fù)雜,ActiveMQ本身不提供管理工具;示例代碼少;主頁上的文檔看上去比較全面,但是缺乏一種有效的組織方式,文檔只有片段,用戶很難由淺入深進(jìn)行了解,二、文檔整體的專業(yè)性太強。在研究階段可以通過查maillist、看Javadoc、分析源代碼來了解。
RabbitMQ 簡介
Rabbitmq簡介可以參考我的兩篇文章:
openstack的RPC機制之AMQP協(xié)議()
RabbitMQ高可用性()
RabbitMQ安裝好之后的默認(rèn)賬號密碼是(guest/guest)
需要注意的是:
多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分?jǐn)偨o多個消費者進(jìn)行處理,而不是每個消費者都收到所有的消息并處理。這種分發(fā)方式叫做round-robin(循環(huán)的方式)。
當(dāng)publisher將消息發(fā)給queue的過程中,publisher會指明routing key。Direct模式中,Direct Exchange 根據(jù) Routing Key 進(jìn)行精確匹配,只有對應(yīng)的 Message Queue 會接受到消息。Topic模式中Exchange會根據(jù)routing key和bindkey進(jìn)行模式匹配,決定將消息發(fā)送到哪個queue中。
有一個疑問:當(dāng)有多個consumer時,rabbitmq會平均分?jǐn)偨o這些consumer;沒辦法把同一個message發(fā)給不同的consumer嗎?
我之前的猜想是,當(dāng)有多個consumer使用topic模式訂閱消息時,所有的消息它們都會收到;但如果是direct模式,只有一個consumer會收到消息。(理解錯誤,topic和direct只是publisher用來選擇發(fā)到不同的queue,不是consumer接收消息。一個隊列一個消息只能發(fā)送給一個消費者,不然消費者的ack也會有很多,RabbitMQ Server也不好處理)
RabbitMQ的消息確認(rèn)
默認(rèn)情況下,如果Message 已經(jīng)被某個Consumer正確的接收到了,那么該Message就會被從queue中移除。當(dāng)然也可以讓同一個Message發(fā)送到很多的Consumer。
如果一個queue沒被任何的Consumer Subscribe(訂閱),那么,如果這個queue有數(shù)據(jù)到達(dá),那么這個數(shù)據(jù)會被cache,不會被丟棄。當(dāng)有Consumer時,這個數(shù)據(jù)會被立即發(fā)送到這個Consumer,這個數(shù)據(jù)被Consumer正確收到時,這個數(shù)據(jù)就被從queue中刪除。
那么什么是正確收到呢?通過ack。每個Message都要被acknowledged(確認(rèn),ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數(shù)據(jù)沒有被ack,那么:
RabbitMQ Server會把這個信息發(fā)送到下一個Consumer。而且ack的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數(shù)據(jù)后發(fā)送ack,甚至在額外的延時后發(fā)送ack,將有效的balance Consumer的load。
RabbitMQ功能測試
本次測試依然是RabbitMQ+springboot,首先需要application.properties
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672spring.rabbitmq.username=guest spring.rabbitmq.password=guest
這里的端口是5672,,15672時管理端的端口。
pom要添加依賴:
評論