99偷拍视频精品区一区二,口述久久久久久久久久久久,国产精品夫妇激情啪发布,成人永久免费网站在线观看,国产精品高清免费在线,青青草在线观看视频观看,久久久久久国产一区,天天婷婷久久18禁,日韩动漫av在线播放直播

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

公司主營(yíng)業(yè)務(wù):做網(wǎng)站、成都做網(wǎng)站、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。創(chuàng)新互聯(lián)推出大名免費(fèi)做網(wǎng)站回饋大家。

今天我們就來(lái)談一談消息隊(duì)列的推拉模式,這也是一個(gè)面試熱點(diǎn),例如你在簡(jiǎn)歷里面寫了 RocketMQ ,基本上會(huì)問(wèn)你 RocketMQ 采用的是推模式還是拉模式啊?是拉模式?不是有 PushConsumer 嗎?

今天我們就來(lái)談?wù)勍评J剑⑶以賮?lái)看看 RocketMQ 和 Kafka 是如何做的。

推拉模式

首先明確一下推拉模式到底是在討論消息隊(duì)列的哪一個(gè)步驟,一般而言我們?cè)谡務(wù)?strong>推拉模式的時(shí)候指的是 Comsumer 和 Broker 之間的交互。

默認(rèn)的認(rèn)為 Producer 與 Broker 之間就是推的方式,即 Producer 將消息推送給 Broker,而不是 Broker 主動(dòng)去拉取消息。

想象一下,如果需要 Broker 去拉取消息,那么 Producer 就必須在本地通過(guò)日志的形式保存消息來(lái)等待 Broker 的拉取,如果有很多生產(chǎn)者的話,那么消息的可靠性不僅僅靠 Broker 自身,還需要靠成百上千的 Producer。

Broker 還能靠多副本等機(jī)制來(lái)保證消息的存儲(chǔ)可靠,而成百上千的 Producer 可靠性就有點(diǎn)難辦了,所以默認(rèn)的 Producer 都是推消息給 Broker。

所以說(shuō)有些情況分布式好,而有些時(shí)候還是集中管理好。

推模式

推模式指的是消息從 Broker 推向 Consumer,即 Consumer 被動(dòng)的接收消息,由 Broker 來(lái)主導(dǎo)消息的發(fā)送。

我們來(lái)想一下推模式有什么好處?

消息實(shí)時(shí)性高, Broker 接受完消息之后可以立馬推送給 Consumer。

對(duì)于消費(fèi)者使用來(lái)說(shuō)更簡(jiǎn)單,簡(jiǎn)單啊就等著,反正有消息來(lái)了就會(huì)推過(guò)來(lái)。

推模式有什么缺點(diǎn)?

推送速率難以適應(yīng)消費(fèi)速率,推模式的目標(biāo)就是以最快的速度推送消息,當(dāng)生產(chǎn)者往 Broker 發(fā)送消息的速率大于消費(fèi)者消費(fèi)消息的速率時(shí),隨著時(shí)間的增長(zhǎng)消費(fèi)者那邊可能就“爆倉(cāng)”了,因?yàn)楦鞠M(fèi)不過(guò)來(lái)啊。當(dāng)推送速率過(guò)快就像 DDos 攻擊一樣消費(fèi)者就傻了。

并且不同的消費(fèi)者的消費(fèi)速率還不一樣,身為 Broker 很難平衡每個(gè)消費(fèi)者的推送速率,如果要實(shí)現(xiàn)自適應(yīng)的推送速率那就需要在推送的時(shí)候消費(fèi)者告訴 Broker ,我不行了你推慢點(diǎn)吧,然后 Broker 需要維護(hù)每個(gè)消費(fèi)者的狀態(tài)進(jìn)行推送速率的變更。

這其實(shí)就增加了 Broker 自身的復(fù)雜度。

所以說(shuō)推模式難以根據(jù)消費(fèi)者的狀態(tài)控制推送速率,適用于消息量不大、消費(fèi)能力強(qiáng)要求實(shí)時(shí)性高的情況下。

拉模式

拉模式指的是 Consumer 主動(dòng)向 Broker 請(qǐng)求拉取消息,即 Broker 被動(dòng)的發(fā)送消息給 Consumer。

我們來(lái)想一下拉模式有什么好處?

拉模式主動(dòng)權(quán)就在消費(fèi)者身上了,消費(fèi)者可以根據(jù)自身的情況來(lái)發(fā)起拉取消息的請(qǐng)求。假設(shè)當(dāng)前消費(fèi)者覺(jué)得自己消費(fèi)不過(guò)來(lái)了,它可以根據(jù)一定的策略停止拉取,或者間隔拉取都行。

拉模式下 Broker 就相對(duì)輕松了,它只管存生產(chǎn)者發(fā)來(lái)的消息,至于消費(fèi)的時(shí)候自然由消費(fèi)者主動(dòng)發(fā)起,來(lái)一個(gè)請(qǐng)求就給它消息唄,從哪開始拿消息,拿多少消費(fèi)者都告訴它,它就是一個(gè)沒(méi)有感情的工具人,消費(fèi)者要是沒(méi)來(lái)取也不關(guān)它的事。

拉模式可以更合適的進(jìn)行消息的批量發(fā)送,基于推模式可以來(lái)一個(gè)消息就推送,也可以緩存一些消息之后再推送,但是推送的時(shí)候其實(shí)不知道消費(fèi)者到底能不能一次性處理這么多消息。而拉模式就更加合理,它可以參考消費(fèi)者請(qǐng)求的信息來(lái)決定緩存多少消息之后批量發(fā)送。

拉模式有什么缺點(diǎn)?

消息延遲,畢竟是消費(fèi)者去拉取消息,但是消費(fèi)者怎么知道消息到了呢?所以它只能不斷地拉取,但是又不能很頻繁地請(qǐng)求,太頻繁了就變成消費(fèi)者在攻擊 Broker 了。因此需要降低請(qǐng)求的頻率,比如隔個(gè) 2 秒請(qǐng)求一次,你看著消息就很有可能延遲 2 秒了。

消息忙請(qǐng)求,忙請(qǐng)求就是比如消息隔了幾個(gè)小時(shí)才有,那么在幾個(gè)小時(shí)之內(nèi)消費(fèi)者的請(qǐng)求都是無(wú)效的,在做無(wú)用功。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

那到底是推還是拉

可以看到推模式和拉模式各有優(yōu)缺點(diǎn),到底該如何選擇呢?

RocketMQ 和 Kafka 都選擇了拉模式,當(dāng)然業(yè)界也有基于推模式的消息隊(duì)列如 ActiveMQ。

我個(gè)人覺(jué)得拉模式更加的合適,因?yàn)楝F(xiàn)在的消息隊(duì)列都有持久化消息的需求,也就是說(shuō)本身它就有個(gè)存儲(chǔ)功能,它的使命就是接受消息,保存好消息使得消費(fèi)者可以消費(fèi)消息即可。

而消費(fèi)者各種各樣,身為 Broker 不應(yīng)該有依賴于消費(fèi)者的傾向,我已經(jīng)為你保存好消息了,你要就來(lái)拿好了。

雖說(shuō)一般而言 Broker 不會(huì)成為瓶頸,因?yàn)橄M(fèi)端有業(yè)務(wù)消耗比較慢,但是 Broker 畢竟是一個(gè)中心點(diǎn),能輕量就盡量輕量。

那么竟然 RocketMQ 和 Kafka 都選擇了拉模式,它們就不怕拉模式的缺點(diǎn)么?怕,所以它們操作了一波,減輕了拉模式的缺點(diǎn)。

長(zhǎng)輪詢

RocketMQ 和 Kafka 都是利用“長(zhǎng)輪詢”來(lái)實(shí)現(xiàn)拉模式,我們就來(lái)看看它們是如何操作的。

為了簡(jiǎn)單化,下面我把消息不滿足本次拉取的條數(shù)啊、總大小啊等等都統(tǒng)一描述成還沒(méi)有消息,反正都是不滿足條件。

RocketMQ 中的長(zhǎng)輪詢

RocketMQ 中的 PushConsumer 其實(shí)是披著推模式實(shí)際上是拉模式的方法,只是看起來(lái)像推模式而已

因?yàn)?RocketMQ 在被背后偷偷的幫我們?nèi)?Broker 請(qǐng)求數(shù)據(jù)了。

后臺(tái)會(huì)有個(gè) RebalanceService 線程,這個(gè)線程會(huì)根據(jù) topic 的隊(duì)列數(shù)量和當(dāng)前消費(fèi)組的消費(fèi)者個(gè)數(shù)做負(fù)載均衡,每個(gè)隊(duì)列產(chǎn)生的 pullRequest 放入阻塞隊(duì)列 pullRequestQueue 中。然后又有個(gè) PullMessageService 線程不斷的從阻塞隊(duì)列 pullRequestQueue 中獲取 pullRequest,然后通過(guò)網(wǎng)絡(luò)請(qǐng)求 broker,這樣實(shí)現(xiàn)的準(zhǔn)實(shí)時(shí)拉取消息。

這一部分代碼我不截了,就是這么個(gè)事兒,稍后會(huì)用圖來(lái)展示。

然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用來(lái)處理拉消息請(qǐng)求的,有消息就直接返回,如果沒(méi)有消息怎么辦呢?我們來(lái)看一下代碼。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

我們?cè)賮?lái)看下 suspendPullRequest 方法做了什么。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

而 PullRequestHoldService 這個(gè)線程會(huì)每 5 秒從 pullRequestTable 取PullRequest請(qǐng)求,然后看看待拉取消息請(qǐng)求的偏移量是否小于當(dāng)前消費(fèi)隊(duì)列最大偏移量,如果條件成立則說(shuō)明有新消息了,則會(huì)調(diào)用 notifyMessageArriving ,最終調(diào)用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新嘗試處理這個(gè)消息的請(qǐng)求,也就是再來(lái)一次,整個(gè)長(zhǎng)輪詢的時(shí)間默認(rèn) 30 秒。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

簡(jiǎn)單的說(shuō)就是 5 秒會(huì)檢查一次消息時(shí)候到了,如果到了則調(diào)用 processRequest 再處理一次。這好像不太實(shí)時(shí)啊?5秒?

別急,還有個(gè) ReputMessageService 線程,這個(gè)線程用來(lái)不斷地從 commitLog 中解析數(shù)據(jù)并分發(fā)請(qǐng)求,構(gòu)建出 ConsumeQueue 和 IndexFile 兩種類型的數(shù)據(jù),并且也會(huì)有喚醒請(qǐng)求的操作,來(lái)彌補(bǔ)每 5s 一次這么慢的延遲

代碼我就不截了,就是消息寫入并且會(huì)調(diào)用 pullRequestHoldService#notifyMessageArriving。

最后我再來(lái)畫個(gè)圖,描述一下整個(gè)流程。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

Kafka 中的長(zhǎng)輪詢

像 Kafka 在拉請(qǐng)求中有參數(shù),可以使得消費(fèi)者請(qǐng)求在 “長(zhǎng)輪詢” 中阻塞等待。

簡(jiǎn)單的說(shuō)就是消費(fèi)者去 Broker 拉消息,定義了一個(gè)超時(shí)時(shí)間,也就是說(shuō)消費(fèi)者去請(qǐng)求消息,如果有的話?cǎi)R上返回消息,如果沒(méi)有的話消費(fèi)者等著直到超時(shí),然后再次發(fā)起拉消息請(qǐng)求。

并且 Broker 也得配合,如果消費(fèi)者請(qǐng)求過(guò)來(lái),有消息肯定馬上返回,沒(méi)有消息那就建立一個(gè)延遲操作,等條件滿足了再返回。

我們來(lái)簡(jiǎn)單的看一下源碼,為了突出重點(diǎn),我會(huì)刪減一些代碼。

先來(lái)看消費(fèi)者端的代碼。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

上面那個(gè) poll 接口想必大家都很熟悉,其實(shí)從注解直接就知道了確實(shí)是等待數(shù)據(jù)的到來(lái)或者超時(shí),我們?cè)俸?jiǎn)單的往下看。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

我們?cè)賮?lái)看下最終 client.poll 調(diào)用的是什么。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

最后調(diào)用的就是 Kafka 包裝過(guò)的 selector,而最終會(huì)調(diào)用 Java nio 的 select(timeout)

現(xiàn)在消費(fèi)者端的代碼已經(jīng)清晰了,我們?cè)賮?lái)看看 Broker 如何做的

Broker 處理所有請(qǐng)求的入口其實(shí)我在之前的文章介紹過(guò),就在 KafkaApis.scala 文件的 handle 方法下,這次的主角就是 handleFetchRequest 。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

這個(gè)方法進(jìn)來(lái),我截取最重要的部分。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

下面的圖片就是 fetchMessages 方法內(nèi)部實(shí)現(xiàn),源碼給的注釋已經(jīng)很清晰了,大家放大圖片看下即可。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

這個(gè)煉獄名字取得很有趣,簡(jiǎn)單的說(shuō)就是利用我之前文章提到的時(shí)間輪,來(lái)執(zhí)行定時(shí)任務(wù),例如這里是delayedFetchPurgatory,專門用來(lái)處理延遲拉取操作。

我們先簡(jiǎn)單想一下,這個(gè)延遲操作都需要實(shí)現(xiàn)哪些方法,首先構(gòu)建的延遲操作需要有檢查機(jī)制,來(lái)查看消息是否已經(jīng)到了,然后呢還得有個(gè)消息到了之后該執(zhí)行的方法,還需要有執(zhí)行完畢之后該干啥的方法,當(dāng)然還得有個(gè)超時(shí)之后得干啥的方法。

這幾個(gè)方法其實(shí)對(duì)應(yīng)的就是代碼里的 DelayedFetch ,這個(gè)類繼承了 DelayedOperation 內(nèi)部有:

  • isCompleted 檢查條件是否滿足的方法

  • tryComplete 條件滿足之后執(zhí)行的方法

  • onComplete 執(zhí)行完畢之后調(diào)用的方法

  • onExpiration 過(guò)期之后需要執(zhí)行的方法

判斷是否過(guò)期就是由時(shí)間輪來(lái)推動(dòng)判斷的,但是總不能等過(guò)期的時(shí)候再去看消息到了沒(méi)吧?

這里 Kafka 和 RocketMQ 的機(jī)制一樣,也會(huì)在消息寫入的時(shí)候提醒這些延遲請(qǐng)求消息來(lái)了,具體代碼我不貼了, 在 ReplicaManager#appendRecords 方法內(nèi)部再深入個(gè)兩方法可以看到。

不過(guò)雖說(shuō)代碼不貼,圖還是要畫一下的。

RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式

RocketMQ  和 Kafka 都是采用“長(zhǎng)輪詢”的機(jī)制,具體的做法都是通過(guò)消費(fèi)者等待消息,當(dāng)有消息的時(shí)候 Broker 會(huì)直接返回消息,如果沒(méi)有消息都會(huì)采取延遲處理的策略,并且為了保證消息的及時(shí)性,在對(duì)應(yīng)隊(duì)列或者分區(qū)有新消息到來(lái)的時(shí)候都會(huì)提醒消息來(lái)了,及時(shí)返回消息。

一句話說(shuō)就是消費(fèi)者和 Broker 相互配合,拉取消息請(qǐng)求不滿足條件的時(shí)候 hold 住,避免了多次頻繁的拉取動(dòng)作,當(dāng)消息一到就提醒返回。

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。

新聞標(biāo)題:RocketMQ和Kafka是如何實(shí)現(xiàn)消息隊(duì)列的推拉模式
當(dāng)前網(wǎng)址:http://www.yijiale78.com/article6/ghdeig.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開發(fā)網(wǎng)站建設(shè)營(yíng)銷型網(wǎng)站建設(shè)微信公眾號(hào)做網(wǎng)站網(wǎng)站策劃

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)