01、簡介
Xline 是一款開源的分布式 KV 存儲引擎,用于管理少量的關(guān)鍵性數(shù)據(jù),其核心目標(biāo)是實(shí)現(xiàn)高性能的數(shù)據(jù)訪問,以及保證跨數(shù)據(jù)中心場景下的強(qiáng)一致性。 Xline 對外提供了一系列兼容 etcd 的訪問接口,比如 KV、Watch、Lease 等等。本文將會著重介紹一下其中的 Lease 接口 。
Lease 是一種客戶端和服務(wù)端之間的租約機(jī)制。類似于我們現(xiàn)實(shí)生活中的租車服務(wù),當(dāng)我們需要使用一輛車時,我們可以向租車公司申請一個 lease,租車公司會給我們分配一輛車,并且保證在我們和租車公司約定的有效期內(nèi)不會把這輛車分配給其他人,如果我們想要延長使用時間,我們可以向租車公司續(xù)租,如果我們不再需要使用這輛車,我們可以主動歸還并取消,或者等待 lease 過期后自動歸還。
在 Xline 中對 lease 的使用和現(xiàn)實(shí)生活中的租車服務(wù)很相似,客戶端可以向服務(wù)點(diǎn)申請一個 lease,服務(wù)端會保證在 lease 的有效期內(nèi)不會刪除這個 lease,客戶端也可以通過相應(yīng)的接口提前結(jié)束或者延長 lease 的時間,與現(xiàn)實(shí)中租車不同的是,我們可以在這個 lease 上綁定一些 key-value,這些 key-value 會隨著 lease 的過期被刪除。
根據(jù)以上介紹的 lease 的能力,我們可以在很多場景下使用 lease 來實(shí)現(xiàn)我們的目的,以下是幾個常見的 lease 應(yīng)用場景:
- 分布式鎖: 分布式鎖是通過多個機(jī)制一同實(shí)現(xiàn)的,lease 在分布式鎖中起到避免死鎖的作用??蛻舳嗽谡埱蠓植际芥i的時候,會創(chuàng)建一個 lease 并不斷續(xù)租,并且寫入 key-value 并附加該 lease,這個 key-value 代表分布式鎖的占用狀態(tài),如果占用該鎖的客戶端因故障無法主動釋放鎖,lease 機(jī)制也會保證在 lease 過期后自動刪除對應(yīng)的 key-value 來釋放當(dāng)前鎖。
- 服務(wù)注冊中心: 注冊新服務(wù)時創(chuàng)建 lease,并寫入服務(wù)相關(guān)信息的 key-value 附加該 lease,在服務(wù)存活期間,對應(yīng)服務(wù)會一直對其 lease 續(xù)租,服務(wù)故障后無法自動續(xù)租,對應(yīng) key-value 自動刪除,相應(yīng)的服務(wù)就會在注冊中心中注銷。
- 分布式系統(tǒng)中的授權(quán)管理: 客戶端通過申請 lease 來獲取資源的訪問權(quán)限,如果客戶端失去與服務(wù)端的連接,或者由于故障沒有及時續(xù)租,導(dǎo)致 lease 過期,該客戶端就會失去相應(yīng)的權(quán)限
02、架構(gòu)

上圖是一個 lease 實(shí)現(xiàn)的簡單架構(gòu)圖,外部 Client 可以通過兩種方式向Xline集群發(fā)送請求,一種是直接通過 Curp 協(xié)議向集群內(nèi)所有節(jié)點(diǎn)廣播請求,Curp 模塊達(dá)成共識后,會把這個請求應(yīng)用到狀態(tài)機(jī),也就是將其寫入存儲層;另一種發(fā)送請求的方式就是 Client 直接將請求發(fā)送到集群中一個節(jié)點(diǎn)的 LeaseServer,這也是與 etcd 兼容的請求方式,請求到達(dá) LeaseServer 后,會有兩條不同的處理路徑,多數(shù)請求會通過 Server 端綁定的 Curp client 廣播給集群中所有節(jié)點(diǎn),剩下的少部分請求可能只有部分節(jié)點(diǎn)能夠處理,這些請求就會被轉(zhuǎn)發(fā)到這些節(jié)點(diǎn)的 LeaseServer,然后應(yīng)用到狀態(tài)機(jī)。
03、源碼分析
源碼組織
Lease 相關(guān)的源碼主要保存在以下文件中,大致分為三個部分:
- RPC 定義:
xlineapi/proto/rpc.proto:Xline 內(nèi)各 Server 的 rpc 接口定義,包括 LeaseServer接口定義。xlineapi/proto/lease.proto:lease 的 rpc message 定義。
- LeaseServer實(shí)現(xiàn):
xline/src/server/lease_server.rs:負(fù)責(zé)提供 Lease RPC service 的具體實(shí)現(xiàn),主要目的是提供 etcd 兼容接口,如果使用外部的 curp client 直接發(fā)送 propose 可以不經(jīng)過此接口,但也有部分不經(jīng)過共識協(xié)議的請求必須通過 LeaseServer 處理。
LeaseStore實(shí)現(xiàn):xline/src/storage/lease_store/lease.rs:定義了Lease數(shù)據(jù)結(jié)構(gòu),用于保存 Lease相關(guān)的信息,比如 Lease 上綁定的所有 Key, Lease 的過期時間,Lease 的剩余 TTL 長度等。并為其實(shí)現(xiàn)了一些實(shí)用的方法。xline/src/storage/lease_store/lease_queue.rs:定義了LeaseQueue和相關(guān)的方法,LeaseQueue是一個由 lease id 以及 lease 過期時間組成的優(yōu)先隊(duì)列,一個后臺常駐 task 會定時通過此結(jié)構(gòu)獲取所有過期 lease 的 id。xline/src/storage/lease_store/lease_collection.rs:定義了LeaseCollection和相關(guān)的方法,LeasCollection是 lease 核心數(shù)據(jù)結(jié)構(gòu)的集合,提供 lease 機(jī)制的核心能力。結(jié)構(gòu)內(nèi)部主要包含三個部分,lease_map保存所有 lease 結(jié)構(gòu);item_map緩存 key 到 lease id 映射;expired_queue管理 lease 過期時間,expired_queue只在 leader 節(jié)點(diǎn)上有意義,其它節(jié)點(diǎn)上為空。xline/src/storage/lease_store/mod.rs:LeaseStore的定義及方法實(shí)現(xiàn)。負(fù)責(zé)提供 lease 的存儲層抽象,對外提供所有 lease 相關(guān)操作的存儲層接口。其內(nèi)部包含LeaseCollection以及和KvStore共享的一些數(shù)據(jù)結(jié)構(gòu)。
Lease 的創(chuàng)建
想要使用 lease,首先就要創(chuàng)建一個 lease,創(chuàng)建 lease 時需要使用 LeaseServer 提供的 LeaseGrant 接口。LeaseServer 中對 LeaseGrant 的處理很簡單,就是分配一個 lease id,然后通過 propose 把請求交給共識協(xié)議處理,達(dá)成共識后,請求會在 LeaseStore 中被執(zhí)行。
LeaseStore 會在 LeaseCollection 中創(chuàng)建并插入一個新的 Lease,其核心代碼邏輯如下:
...if is_leader { let expiry = lease.refresh(Duration::ZERO); let _ignore = inner.expired_queue.insert(lease_id, expiry);} else { lease.forever();}let _ignore = inner.lease_map.insert(lease_id, lease.clone());...
需要注意的是,如果當(dāng)前節(jié)點(diǎn)是 leader 節(jié)點(diǎn)的話,還需要承擔(dān)管理 lease 過期時間的任務(wù),所以需要通過refresh 方法計算 Lease 的過期時間,并將其插入到 expired_queue 中。其他節(jié)點(diǎn)則不需要這一步處理,只需要將新的 Lease 插入到 lease_map 中。計算過期時間使用的 refresh 定義如下:
Lease 創(chuàng)建完成后,服務(wù)端會給客戶端返回一個包含 lease id 的響應(yīng)。
Lease的使用

獲取到 lease id 后,客戶端就可以通過 lease id 來使用這個 lease,在 Put 一對 key value 時可以附加 lease id,這個 Put 請求被應(yīng)用到狀態(tài)機(jī)時,除了直接在 KvStore 的 Index和 DB 中寫入 key-value 以外,還會通過LeaseCollection 提供的 detach方法分離當(dāng)前 key 和舊的 lease ,并通過 attach 將需要 put 的 key 附加到新的 lease id 上。
pub(crate) fn attach(&self, lease_id: i64, key: Vec< u8 >) - > Result< (), ExecuteError > { let mut inner = self.inner.write(); let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; lease.insert_key(key.clone()); let _ignore = inner.item_map.insert(key, lease_id); Ok(())}
attach 的具體實(shí)現(xiàn)就是通過 lease id 找到對應(yīng)的 Lease,并將 key 附加到 Lease上,以及在 item_map中添加 key 到 lease id 的映射關(guān)系。detach 的實(shí)現(xiàn)與 attach的相反,它會移除 attach 時插入的內(nèi)容。
經(jīng)過以上的過程,我們已經(jīng)成功將 key 和 lease id 關(guān)聯(lián)在一起,此時如果這個 Lease 被主動 revoke 或者超時,那么這個 Lease以及它關(guān)聯(lián)的所有 key,都會被刪除。
Lease 的主動刪除
刪除一個 lease 需要調(diào)用 LeaseRevoke接口,這個接口在 LeaseServer 中的處理與 LeaseGrant基本相同,都是將請求交給共識協(xié)議處理,唯一的不同是 LeaseRevoke 不需要分配 lease id。
let del_keys = match self.lease_collection.look_up(req.id) { Some(l) = > l.keys(), None = > return Err(ExecuteError::lease_not_found(req.id)),};if del_keys.is_empty() { let _ignore = self.lease_collection.revoke(req.id); return Ok(Vec::new());}// delete keys ...let _ignore = self.lease_collection.revoke(req.id);
LeaseRevoke 被執(zhí)行時,首先會嘗試查找 Lease 是否有關(guān)聯(lián)的 key,如果沒有,那么就可以直接通過 LeaseCollection 上的 revoke方法將 Lease 刪除,如果有關(guān)聯(lián)的 key 的話那么就需要將關(guān)聯(lián)的所有 key 從 KvStore 中刪除,并清理 LeaseCollection中這些 key 和 lease id 的關(guān)系,然后才能從 LeaseCollection 中 reovke這個 Lease。
Lease 的過期

Lease 過期時的處理流程如上圖所示,此處省略了共識的部分,在初始化 LeaseServer 時,會創(chuàng)建一個后臺常駐的 revoke_expired_leases_task,這個 task 的主體代碼如下:
loop { // only leader will check expired lease if lease_server.lease_storage.is_primary() { for id in lease_server.lease_storage.find_expired_leases() { let _handle = tokio::spawn({ let s = Arc::clone(&lease_server); async move { let request = tonic::Request::new(LeaseRevokeRequest { id }); if let Err(e) = s.lease_revoke(request).await { warn!("Failed to revoke expired leases: {}", e); } } }); } } time::sleep(DEFAULT_LEASE_REQUEST_TIME).await;}
在負(fù)責(zé)管理 Lease 過期時間節(jié)點(diǎn)上,這個 task 會定時通過 find_expired_leases 獲取已經(jīng)過期的所有 lease id, 然后調(diào)用 lease server 上的 lease_revoke 接口來刪除過期的 Lease,這個接口和客戶度主動刪除 Lease 時使用的是同一個接口。
find_expired_leases 是 LeaseCollection 上一個核心方法,具體實(shí)現(xiàn)如下:
pub(crate) fn find_expired_leases(&self) - > Vec< i64 > { let mut expired_leases = vec![]; let mut inner = self.inner.write(); while let Some(expiry) = inner.expired_queue.peek() { if *expiry <= Instant::now() { #[allow(clippy::unwrap_used)] // queue.peek() returns Some let id = inner.expired_queue.pop().unwrap(); if inner.lease_map.contains_key(&id) { expired_leases.push(id); } } else { break; } } expired_leases}
在創(chuàng)建 Lease時,我們已經(jīng)計算過了Lease過期的時間并將其插入了 expired_queue ,調(diào)用 find_expired_queue 時會一直嘗試從優(yōu)先隊(duì)列隊(duì)頭拿出已經(jīng)過期的 Lease ,直到遇到第一個不過期的 Lease 后停止嘗試,然后將拿到的所有 lease id 返回。
Lease 的續(xù)租
如果想要讓創(chuàng)建的 Lease 能夠持續(xù)更長時間,那就需要在客戶端和服務(wù)端之間維護(hù)一條 stream,客戶端定時向服務(wù)端發(fā)送 LeaseKeepAlive請求。和前面提到的請求不同,LeaseKeepAlive請求不需要經(jīng)過共識協(xié)議,因?yàn)檫@個請求依賴只存在于 leader 節(jié)點(diǎn)上的 Lease過期時間,因此只有 leader 節(jié)點(diǎn)能夠處理 LeaseKeepAlive 請求,follower 節(jié)點(diǎn)會把請求轉(zhuǎn)發(fā)至 leader 節(jié)點(diǎn)上處理。具體的轉(zhuǎn)發(fā)邏輯可以參考 lease_server.rs 內(nèi)的源碼。
在 leader 和 client 建立起 stream 后,每當(dāng) leader 從 stream 中收到 lease id,都會為這個 lease 續(xù)租,最終續(xù)租的邏輯是通過 LeaseCollection 提供的 renew 方法實(shí)現(xiàn)的。該方法定義如下:
pub(crate) fn renew(&self, lease_id: i64) - > Result< i64, ExecuteError > { let mut inner = self.inner.write(); let (expiry, ttl) = { let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; if lease.expired() { return Err(ExecuteError::lease_expired(lease_id)); } let expiry = lease.refresh(Duration::default()); let ttl = lease.ttl().as_secs().cast(); (expiry, ttl) }; let _ignore = inner.expired_queue.update(lease_id, expiry); Ok(ttl)}
Renew 會先檢查對應(yīng) Lease 是否已經(jīng)過期,沒有過期的話就會重新計算過期時間,然后更新它在 expired_queue 中的順序。
只要 client 和 server 之間的連接不中斷,client 就會一直通過 stream 向服務(wù)端發(fā)送 LeaseKeepAlive 請求,這個 lease 也就不會超時,前文提到的 lease 主要的應(yīng)用場景中,幾乎都用到了這個特性來判斷客戶端是否在正常運(yùn)行。
Lease 信息的讀取
Lease 有兩個讀取接口,一個是 LeaseTimeToLive,這個接口會讀取一個 lease 的詳細(xì)信息,包括它的過期時間,和 LeaseKeepAlive 一樣,因?yàn)檫^期時間只存在于 leader 節(jié)點(diǎn),因此該請求需要轉(zhuǎn)發(fā)只 leader 處理;另一個讀取接口是 LeaseLeases,這個接口會列出系統(tǒng)中所有的 lease id,這個接口不需要 lease 過期時間的信息,因此可以直接交給共識協(xié)議處理,所以在 LeaseServer中的處理和 LeaseGrant、LeaseRevoke 相似。此處不再贅述。
LeaseTimeToLive和 LeaseLeases 讀取信息的能力最終由 LeaseCollection 實(shí)現(xiàn),源碼如下:
pub(crate) fn look_up(&self, lease_id: i64) - > Option< Lease > { self.inner.read().lease_map.get(&lease_id).cloned()} pub(crate) fn leases(&self) - > Vec< Lease > { let mut leases = self .inner .read() .lease_map .values() .cloned() .collect::< Vec< _ >>(); leases.sort_by_key(Lease::remaining); leases}
04、總結(jié)
本文介紹了 Xline 下的一個重要接口 Lease,用戶可以通過 Lease 實(shí)現(xiàn)一組 key 的定時過期,并且能夠通過 KeepAlive 接口為 Lease 續(xù)租,服務(wù)端也能夠根據(jù)此特性探測客戶端是否在正常運(yùn)作。依賴于 Lease 機(jī)制的這些特點(diǎn),也誕生出了很多典型的應(yīng)用場景,比如本文介紹過的分布式鎖、服務(wù)注冊中心,授權(quán)管理等等。
-
RPC
+關(guān)注
關(guān)注
0文章
113瀏覽量
12227
發(fā)布評論請先 登錄
Faster Transformer v2.1版本源碼解讀
OneFlow Softmax算子源碼解讀之WarpSoftmax
OneFlow Softmax算子源碼解讀之BlockSoftmax
聊聊Dubbo - Dubbo可擴(kuò)展機(jī)制源碼解析
直播系統(tǒng)源碼選擇二開的好處是什么
闡述FreeRTOS系統(tǒng)中機(jī)制的實(shí)現(xiàn)原理
AP側(cè)中網(wǎng)相關(guān)的PLMN業(yè)務(wù)源碼流程解讀
風(fēng)河:用“商業(yè)機(jī)制”保護(hù)開放源碼的價值
OC的消息轉(zhuǎn)發(fā)機(jī)制的深度解讀
基于EAIDK的人臉?biāo)惴☉?yīng)用-源碼解讀(2)
openharmony源碼解讀
Xline源碼解讀(一)—初識CURP協(xié)議
Xline源碼解讀(三)—CURP Server的實(shí)現(xiàn)
分布式系統(tǒng)中Membership Change 源碼解讀
Xline源碼解讀(二)—Lease的機(jī)制與實(shí)現(xiàn)
評論