chinese直男口爆体育生外卖, 99久久er热在这里只有精品99, 又色又爽又黄18禁美女裸身无遮挡, gogogo高清免费观看日本电视,私密按摩师高清版在线,人妻视频毛茸茸,91论坛 兴趣闲谈,欧美 亚洲 精品 8区,国产精品久久久久精品免费

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內(nèi)不再提示

緩存之美:萬文詳解 Caffeine 實現(xiàn)原理(上)

京東云 ? 來源:jf_75140285 ? 作者:jf_75140285 ? 2025-08-05 14:49 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

文章將采用“總-分-總”的結(jié)構(gòu)對配置固定大小元素驅(qū)逐策略的 Caffeine 緩存進行介紹,首先會講解它的實現(xiàn)原理,在大家對它有一個概念之后再深入具體源碼的細節(jié)之中,理解它的設計理念,從中能學習到用于統(tǒng)計元素訪問頻率的 Count-Min Sketch 數(shù)據(jù)結(jié)構(gòu)、理解內(nèi)存屏障和如何避免緩存?zhèn)喂蚕韱栴}、MPSC 多線程設計模式、高性能緩存的設計思想和多線程間的協(xié)調(diào)方案等等,文章最后會對全文內(nèi)容進行總結(jié),希望大家能有所收獲的同時在未來對本地緩存選型時提供完整的理論依據(jù)。

Caffeine 緩存原理圖如下:

wKgZPGiRqV6ACzEGABFA9ACvlZA860.png

它使用 ConcurrentHashMap 保存數(shù)據(jù),并在該數(shù)據(jù)結(jié)構(gòu)的基礎上創(chuàng)建了窗口區(qū)、試用區(qū)和保護區(qū),用于管理元素的生命周期,各個區(qū)的數(shù)據(jù)結(jié)構(gòu)是使用了 LRU 算法的雙端隊列,隨著緩存的命中率變化,窗口區(qū)和保護區(qū)大小會自動調(diào)節(jié)以適應當前訪問模式。在對元素進行驅(qū)逐時,使用了 TinyLFU 算法,會優(yōu)先將頻率低的元素驅(qū)逐,訪問頻率使用 Count-Min Sketch 數(shù)據(jù)結(jié)構(gòu)記錄,它能在保證較高準確率(93.75%)的情況下占用較少內(nèi)存空間。讀、寫操作分別會向 ReadBuffer 和 WriteBuffer 中添加“讀/寫后任務”,這兩個緩沖區(qū)的設計均采用了 MPSC 多生產(chǎn)者單消費者的多線程設計模式。緩沖區(qū)中任務的消費由維護方法 maintenance 中 drainReadBuffer 和 drainWriteBuffer 實現(xiàn),維護方法通過添加同步鎖,保證任務只由單線程執(zhí)行,這種設計參考了 WAL(Write-Ahead Logging)思想,即:先寫日志,再執(zhí)行操作,先把操作記錄在緩沖區(qū),然后在合適的時機異步、批量地執(zhí)行緩沖區(qū)中的任務。維護方法除了這些作用外,還負責元素在各個分區(qū)的移動、頻率的更新、元素的驅(qū)逐等操作。

接下來的源碼分析以如下測試用例為例:先分析構(gòu)造方法,了解緩存初始化過程中創(chuàng)建的重要數(shù)據(jù)結(jié)構(gòu)和關鍵字段,然后再深入添加元素的方法(put),該方法相對復雜,也是 Caffeine 緩存的核心,理解了這部分內(nèi)容,文章剩余的內(nèi)容理解起來會非常容易,接著分析獲取元素的方法(getIfPresent),最后再回到核心的維護方法 maintenance 中,這樣便基本理解了 Caffeine 緩存的運行原理,需要注意的是,因為我們并未指定緩存元素的過期時間,所以與此相關的內(nèi)容如時間過期策略和時間輪等內(nèi)容不會專門介紹。

public class TestReadSourceCode {

    @Test
    public void doRead() {
        // read constructor
        Cache cache = Caffeine.newBuilder()
                .maximumSize(10_000)
                .build();

        // read put
        cache.put("key", "value");

        // read get
        cache.getIfPresent("key");
    }

}

constructor

Caffeine 的實現(xiàn)類區(qū)分了 BoundedLocalManualCache 和 UnboundedLocalManualCache,見名知意它們分別為“有邊界”的和“無邊界”的緩存。Caffeine#isBounded 方法詮釋了“邊界”的含義:

public final class Caffeine {

    static final int UNSET_INT = -1;

    public  Cache build() {
        // 校驗參數(shù)
        requireWeightWithWeigher();
        requireNonLoadingCache();

        @SuppressWarnings("unchecked")
        Caffeine self = (Caffeine) this;
        return isBounded()
                ? new BoundedLocalCache.BoundedLocalManualCache(self)
                : new UnboundedLocalCache.UnboundedLocalManualCache(self);
    }

    boolean isBounded() {
        // 指定了最大大??;指定了最大權重
        return (maximumSize != UNSET_INT) || (maximumWeight != UNSET_INT)
                // 指定了訪問后過期策略;指定了寫后過期策略
                || (expireAfterAccessNanos != UNSET_INT) || (expireAfterWriteNanos != UNSET_INT)
                // 指定了自定義過期策略;指定了 key 或 value 的引用級別
                || (expiry != null) || (keyStrength != null) || (valueStrength != null);
    }
}

也就是說,當為緩存指定了上述的驅(qū)逐或過期策略會定義為有邊界的 BoundedLocalManualCache 緩存,它會限制緩存的大小,防止內(nèi)存溢出,否則為無邊界的 UnboundedLocalManualCache 類型,它沒有大小限制,直到內(nèi)存耗盡。我們以創(chuàng)建配置了固定大小的緩存為例,它對應的類型便是 BoundedLocalManualCache,在執(zhí)行構(gòu)造方法時,有以下邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef
        implements LocalCache {
    // ...

    static class BoundedLocalManualCache implements LocalManualCache, Serializable {
        private static final long serialVersionUID = 1;

        final BoundedLocalCache cache;

        BoundedLocalManualCache(Caffeine builder) {
            this(builder, null);
        }

        BoundedLocalManualCache(Caffeine builder, @Nullable CacheLoader loader) {
            cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);
        }
    }
}

BoundedLocalCache 為抽象類,緩存對象的實際類型都是它的子類。它在創(chuàng)建時使用了反射并遵循簡單工廠的編碼風格:

interface LocalCacheFactory {
    static  BoundedLocalCache newBoundedLocalCache(Caffeine builder,
                                                               @Nullable AsyncCacheLoader cacheLoader, boolean async) {
        var className = getClassName(builder);
        var factory = loadFactory(className);
        try {
            return factory.newInstance(builder, cacheLoader, async);
        } catch (RuntimeException | Error e) {
            throw e;
        } catch (Throwable t) {
            throw new IllegalStateException(className, t);
        }
    }
}

getClassName 方法非常有意思,它會根據(jù)緩存配置的屬性動態(tài)拼接出實際緩存類名:

interface LocalCacheFactory {

    static String getClassName(Caffeine builder) {
        var className = new StringBuilder();
        // key 是強引用或弱引用
        if (builder.isStrongKeys()) {
            className.append('S');
        } else {
            className.append('W');
        }
        // value 是強引用或弱引用
        if (builder.isStrongValues()) {
            className.append('S');
        } else {
            className.append('I');
        }
        // 配置了移除監(jiān)聽器
        if (builder.removalListener != null) {
            className.append('L');
        }
        // 配置了統(tǒng)計功能
        if (builder.isRecordingStats()) {
            className.append('S');
        }
        // 不同的驅(qū)逐策略
        if (builder.evicts()) {
            // 基于最大值限制,可能是最大權重W,也可能是最大容量S
            className.append('M');
            // 基于權重或非權重
            if (builder.isWeighted()) {
                className.append('W');
            } else {
                className.append('S');
            }
        }
        // 配置了訪問過期或可變過期策略
        if (builder.expiresAfterAccess() || builder.expiresVariable()) {
            className.append('A');
        }
        // 配置了寫入過期策略
        if (builder.expiresAfterWrite()) {
            className.append('W');
        }
        // 配置了刷新策略
        if (builder.refreshAfterWrite()) {
            className.append('R');
        }
        return className.toString();
    }
}

這也就是為什么能在 com.github.benmanes.caffeine.cache 包路徑下能發(fā)現(xiàn)很多類似 SSMS 只有簡稱命名的類的原因(下圖只截取部分,實際上有很多):

wKgZO2iRqV6AOqckAAAUJgkJ52Y634.png

根據(jù)代碼邏輯,它的命名遵循如下格式 S|W S|I [L] [S] [MW|MS] [A] [W] [R] 其中 [] 表示選填,| 表示某配置不同選擇的分隔符,結(jié)合注釋能清楚的了解各個位置字母簡稱表達的含義。如此定義實現(xiàn)類使用了 多級繼承,盡可能多地復用代碼。

以我們測試用例中創(chuàng)建的緩存類型為例,它對應的實現(xiàn)類為 SSMS,表示 key 和 value 均為強引用,并配置了非權重的最大緩存大小限制,類圖關系如下:

wKgZO2iRqWCAc5TlAAUDX4Iv_CE796.png

雖然在一些軟件設計相關的書籍中強調(diào)“多用組合,少用繼承”,但是這里使用多級繼承我覺得并沒有增加開發(fā)者的理解難度,反而了解了它的命名規(guī)則后,能更清晰的理解各個緩存所表示的含義,更好地實現(xiàn)代碼復用。

執(zhí)行 SSMS 的構(gòu)造方法會有以下邏輯:

// 1
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef
        implements LocalCache {

    static final int WRITE_BUFFER_MIN = 4;
    static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);

    static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;

    static final double PERCENT_MAIN = 0.99d;
    static final double PERCENT_MAIN_PROTECTED = 0.80d;

    static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;

    final @Nullable RemovalListener evictionListener;
    final @Nullable AsyncCacheLoader cacheLoader;

    final MpscGrowableArrayQueue writeBuffer;
    final ConcurrentHashMap> data;
    final PerformCleanupTask drainBuffersTask;
    final Consumer> accessPolicy;
    final Buffer> readBuffer;
    final NodeFactory nodeFactory;
    final ReentrantLock evictionLock;
    final Weigher weigher;
    final Executor executor;

    final boolean isAsync;
    final boolean isWeighted;

    protected BoundedLocalCache(Caffeine builder,
                                @Nullable AsyncCacheLoader cacheLoader, boolean isAsync) {
        // 標記同步或異步
        this.isAsync = isAsync;
        // 指定 cacheLoader 
        this.cacheLoader = cacheLoader;
        // 指定用于執(zhí)行驅(qū)逐元素、刷新緩存等任務的線程池,不指定默認為 ForkJoinPool.commonPool()
        executor = builder.getExecutor();
        // 標記是否定義了節(jié)點計算權重的 Weigher 對象
        isWeighted = builder.isWeighted();
        // 同步鎖,在接下來的內(nèi)容中會看到很多標記了 @GuardedBy("evictionLock") 注解的方法,表示這行這些方法時都會獲取這把同步鎖
        // 根據(jù)該鎖的命名,eviction 表示驅(qū)逐的意思,也就是說關注驅(qū)逐策略執(zhí)行的方法都要獲取該鎖,這一點需要在后文中注意
        evictionLock = new ReentrantLock();
        // 計算元素權重的對象,不指定為 SingletonWeigher.INSTANCE
        weigher = builder.getWeigher(isAsync);
        // 執(zhí)行緩存 maintenance 方法的任務,在后文中具體介紹
        drainBuffersTask = new PerformCleanupTask(this);
        // 創(chuàng)建節(jié)點的工廠
        nodeFactory = NodeFactory.newFactory(builder, isAsync);
        // 驅(qū)逐監(jiān)聽器,有元素被驅(qū)逐時會回調(diào)
        evictionListener = builder.getEvictionListener(isAsync);
        // 用于保存所有數(shù)據(jù)的 ConcurrentHashMap
        data = new ConcurrentHashMap(builder.getInitialCapacity());
        // 如果指定驅(qū)逐策略 或 key為弱引用 或 value為弱引用或軟引用 或 訪問后過期則創(chuàng)建 readBuffer,否則它為不可用狀態(tài)
        // readBuffer 用于記錄某些被訪問過的節(jié)點
        readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
                ? new BoundedBuffer() : Buffer.disabled();
        // 如果指定了驅(qū)逐策略 或 訪問后過期策略則會定義訪問策略,執(zhí)行 onAccess 方法,后文詳細介紹
        accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};
        // 初始化最大值和最小值的雙端隊列作為 writeBuffer,用于記錄一些寫后操作任務 
        writeBuffer = new MpscGrowableArrayQueue(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);

        // 執(zhí)行了驅(qū)逐策略則更新最大容量限制
        if (evicts()) {
            setMaximumSize(builder.getMaximum());
        }
    }

    @GuardedBy("evictionLock")
    void setMaximumSize(long maximum) {
        requireArgument(maximum >= 0, "maximum must not be negative");
        if (maximum == maximum()) {
            return;
        }

        // 不能超過最大容量
        long max = Math.min(maximum, MAXIMUM_CAPACITY);
        // 計算窗口區(qū)大小
        long window = max - (long) (PERCENT_MAIN * max);
        // 計算保護區(qū)大小
        long mainProtected = (long) (PERCENT_MAIN_PROTECTED * (max - window));

        // 記錄這些值
        setMaximum(max);
        setWindowMaximum(window);
        setMainProtectedMaximum(mainProtected);

        // 標記命中量、非命中量并初始化步長值,這三個值用于后續(xù)動態(tài)調(diào)整保護區(qū)和窗口區(qū)大小
        setHitsInSample(0);
        setMissesInSample(0);
        setStepSize(-HILL_CLIMBER_STEP_PERCENT * max);

        // 直到當前緩存的權重(大小)接近最大值一半時才初始化頻率草圖
        if ((frequencySketch() != null) && !isWeighted() && (weightedSize() >= (max >>> 1))) {
            frequencySketch().ensureCapacity(max);
        }
    }
}

// 2
class SS extends BoundedLocalCache {
    static final LocalCacheFactory FACTORY = SS::new;

    // key value 強引用無需特殊操作
    SS(Caffeine var1, @Nullable AsyncCacheLoader var2, boolean var3) {
        super(var1, var2, var3);
    }
}

// 3
class SSMS extends SS {

    // 頻率草圖,后文具體介紹
    final FrequencySketch sketch = new FrequencySketch();

    final AccessOrderDeque> accessOrderWindowDeque;
    final AccessOrderDeque> accessOrderProbationDeque;
    final AccessOrderDeque> accessOrderProtectedDeque;

    SSMS(Caffeine var1, @Nullable AsyncCacheLoader var2, boolean var3) {
        super(var1, var2, var3);
        // 如果 Caffeine 初始化了容量則確定頻率草圖的容量
        if (var1.hasInitialCapacity()) {
            long var4 = Math.min(var1.getMaximum(), (long) var1.getInitialCapacity());
            this.sketch.ensureCapacity(var4);
        }

        // 初始化窗口區(qū)、試用區(qū)和保護區(qū),它們都是雙端隊列(鏈表實現(xiàn))
        this.accessOrderWindowDeque = !var1.evicts() && !var1.expiresAfterAccess() ? null : new AccessOrderDeque();
        this.accessOrderProbationDeque = new AccessOrderDeque();
        this.accessOrderProtectedDeque = new AccessOrderDeque();
    }
}

在步驟 1 中定義了三個區(qū)的初始化大小為 1%|19%|80%,這樣配置的性能相對較好。此外,我們還需要解釋一下 weightedSize() 方法,它用于訪問 long weightedSize 變量。根據(jù)其命名有“權重大小”的含義,在默認不指定權重計算對象 Weigher 的情況下,Weigher 默認為 SingletonWeigher.INSTANCE 表示每個元素的權重大小為 1,如下:

enum SingletonWeigher implements Weigher {
    INSTANCE;

    @Override
    public int weigh(Object key, Object value) {
        return 1;
    }
}

這樣 weightedSize 表示的便是當前緩存中元素數(shù)量。如果自定義了 Weigher 那么 weightedSize 表示的便是緩存中總權重大小,每個元素的權重則可能會不同。因為在示例中我們并沒有指定 Weigher,所以在此處可以將 weightedSize 理解為當前緩存大小。

上文中我們提到緩存的定義遵循大寫字母縮寫的命名規(guī)則,實際上節(jié)點類的定義也采用了這種方式,在創(chuàng)建節(jié)點工廠 NodeFactory.newFactory(builder, isAsync)
的邏輯中,它會執(zhí)行如下邏輯,根據(jù)緩存的類型來確定它的節(jié)點類型,命名遵循 P|F S|W|D A|AW|W| [R] [MW|MS] 的規(guī)則,同樣使用了反射機制和簡單工廠的編碼風格,如下:

interface NodeFactory {
    // ...

    static  NodeFactory newFactory(Caffeine builder, boolean isAsync) {
        if (builder.interner) {
            return (NodeFactory) Interned.FACTORY;
        }
        var className = getClassName(builder, isAsync);
        return loadFactory(className);
    }

    static String getClassName(Caffeine builder, boolean isAsync) {
        var className = new StringBuilder();
        // key 強引用或弱引用
        if (builder.isStrongKeys()) {
            className.append('P');
        } else {
            className.append('F');
        }
        // value 強引用或弱引用或軟引用
        if (builder.isStrongValues()) {
            className.append('S');
        } else if (builder.isWeakValues()) {
            className.append('W');
        } else {
            className.append('D');
        }
        // 過期策略
        if (builder.expiresVariable()) {
            if (builder.refreshAfterWrite()) {
                // 訪問后過期
                className.append('A');
                if (builder.evicts()) {
                    // 寫入后過期
                    className.append('W');
                }
            } else {
                className.append('W');
            }
        } else {
            // 訪問后過期
            if (builder.expiresAfterAccess()) {
                className.append('A');
            }
            // 寫入后過期
            if (builder.expiresAfterWrite()) {
                className.append('W');
            }
        }
        // 寫入后刷新
        if (builder.refreshAfterWrite()) {
            className.append('R');
        }
        // 驅(qū)逐策略
        if (builder.evicts()) {
            // 默認最大大小限制
            className.append('M');
            // 加權
            if (isAsync || (builder.isWeighted() && (builder.weigher != Weigher.singletonWeigher()))) {
                className.append('W');
            } else {
                // 非加權
                className.append('S');
            }
        }
        return className.toString();
    }

}

SSMS 類型緩存對應的節(jié)點類型為 PSMS。

FrequencySketch

接下來,我們需要具體介紹下 FrequencySketch,它在上述構(gòu)造方法的步驟 3 中被創(chuàng)建。這個類的實現(xiàn)采用了 Count-Min Sketch 數(shù)據(jù)結(jié)構(gòu),它維護了一個 long[] table 一維數(shù)組,每個元素有 64 位,每 4 位作為一個計數(shù)器(這也就限定了最大頻率為 15),那么數(shù)組中每個槽位便是 16 個計數(shù)器。通過哈希函數(shù)取 4 個獨立的計數(shù)值,將其中的最小值作為元素的訪問頻率。table 的初始大小為緩存最大容量最接近的 2 的 n 次冪,并在計算哈希值時使用 blockMask 掩碼來使哈希結(jié)果均勻分布,保證了獲取元素訪問頻率的正確率為 93.75%,達到空間與時間的平衡。它的實現(xiàn)原理和布隆過濾器類似,犧牲了部分準確性,但減少了占用內(nèi)存的大小。如下圖所示為計算元素 e 的訪問頻率:

wKgZPGiRqWGANlxcAAqrCLf4x8I995.png

以下為 FrequencySketch 的源碼,關注注釋即可,并不復雜:

final class FrequencySketch {

    static final long RESET_MASK = 0x7777777777777777L;
    static final long ONE_MASK = 0x1111111111111111L;

    // 采樣大小,用于控制 reset
    int sampleSize;
    // 掩碼,用于均勻分散哈希結(jié)果
    int blockMask;
    long[] table;
    int size;

    public FrequencySketch() {
    }

    public void ensureCapacity(@NonNegative long maximumSize) {
        requireArgument(maximumSize >= 0);
        // 取緩存最大容量和 Integer.MAX_VALUE >>> 1 中的小值 
        int maximum = (int) Math.min(maximumSize, Integer.MAX_VALUE >>> 1);
        // 如果已經(jīng)被初始化過并且 table 長度大于等于最大容量,那么不進行操作
        if ((table != null) && (table.length >= maximum)) {
            return;
        }

        // 初始化 table,長度為最接近 maximum 的 2的n次冪 和 8 中的大值
        table = new long[Math.max(Caffeine.ceilingPowerOfTwo(maximum), 8)];
        // 計算采樣大小
        sampleSize = (maximumSize == 0) ? 10 : (10 * maximum);
        // 計算掩碼
        blockMask = (table.length >>> 3) - 1;
        // 特殊判斷
        if (sampleSize <= 0) {
            sampleSize = Integer.MAX_VALUE;
        }
        // 計數(shù)器總數(shù)
        size = 0;
    }

    @NonNegative
    public int frequency(E e) {
        // 如果緩存沒有被初始化則返回頻率為 0
        if (isNotInitialized()) {
            return 0;
        }

        // 創(chuàng)建 4 個元素的數(shù)組 count 用于保存 4 次 hash 計算出的頻率值
        int[] count = new int[4];
        // hash 擾動,使結(jié)果均勻分布
        int blockHash = spread(e.hashCode());
        // 重 hash,進一步分散結(jié)果
        int counterHash = rehash(blockHash);
        // 根據(jù)掩碼計算對應的塊索引
        int block = (blockHash & blockMask) >> (i >> 1) & 15;
            // 計算計數(shù)器的偏移量
            int offset = h & 1;
            // 定位到 table 中某個槽位后右移并進行位與運算得到最低的 4 位的值(0xfL 為二進制的 1111)
            count[i] = (int) ((table[block + offset + (i >> (index >> (i >> 1) & 15;
            int offset = h & 1;
            // i + 4 記錄元素所在 table 中的索引
            index[i + 4] = block + offset + (i >> 1) & RESET_MASK;
        }
        // count >>> 2 表示計數(shù)器個數(shù),計算重置后的 size
        size = (size - (count >>> 2)) >>> 1;
    }

    static int spread(int x) {
        x ^= x >>> 17;
        x *= 0xed5ad4bb;
        x ^= x >>> 11;
        x *= 0xac4c1b51;
        x ^= x >>> 15;
        return x;
    }

    static int rehash(int x) {
        x *= 0x31848bab;
        x ^= x >>> 14;
        return x;
    }

}

到這里,Caffeine 緩存的基本數(shù)據(jù)結(jié)構(gòu)全貌已經(jīng)展現(xiàn)出來了,如下所示,在后文中我們再具體關注它們之間是如何協(xié)同的。

wKgZO2iRqWOAGD6FAAyQ2GkyIYU736.png

put

接下來繼續(xù)了解向緩存中添加元素的流程,本節(jié)內(nèi)容比較多,理解起來也相對復雜,結(jié)合文章內(nèi)容的同時,也需要多去深入查看 Caffeine 源碼才能有更好的理解,以下為 put 方法的源碼:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    // 默認入?yún)?onlyIfAbsent 為 false,表示向緩存中添加相同的 key 會對 value 進行替換 
    @Override
    public @Nullable V put(K key, V value) {
        return put(key, value, expiry(), /* onlyIfAbsent */ false);
    }
}

它會執(zhí)行到如下具體邏輯中,關注注釋信息:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final int WRITE_BUFFER_RETRIES = 100;

    final MpscGrowableArrayQueue writeBuffer;

    final ConcurrentHashMap> data;

    final ReentrantLock evictionLock;

    final NodeFactory nodeFactory;

    @Nullable
    V put(K key, V value, Expiry expiry, boolean onlyIfAbsent) {
        // 不允許添加 null
        requireNonNull(key);
        requireNonNull(value);

        Node node = null;
        // 獲取當前時間戳
        long now = expirationTicker().read();
        // 計算緩存權重,如果沒有指定 weigher 的話,默認權重為 1
        int newWeight = weigher.weigh(key, value);
        // 創(chuàng)建用于查找的鍵對象
        Object lookupKey = nodeFactory.newLookupKey(key);
        
        for (int attempts = 1; ; attempts++) {
            // 嘗試獲取節(jié)點;prior 譯為先前的;較早的
            Node prior = data.get(lookupKey);
            // 處理不存在的節(jié)點
            if (prior == null) {
                // 如果 node 在循環(huán)執(zhí)行中還未被創(chuàng)建
                if (node == null) {
                    // NodeFactory 創(chuàng)建對應類型節(jié)點
                    node = nodeFactory.newNode(key, keyReferenceQueue(), value, valueReferenceQueue(), newWeight, now);
                    // 設置節(jié)點的過期時間
                    setVariableTime(node, expireAfterCreate(key, value, expiry, now));
                }
                // 嘗試添加新節(jié)點到緩存中,如果鍵已存在則返回現(xiàn)有節(jié)點
                prior = data.putIfAbsent(node.getKeyReference(), node);
                // 返回 null 表示插入成功
                if (prior == null) {
                    // 寫后操作:添加 AddTask 并調(diào)度執(zhí)行任務
                    afterWrite(new AddTask(node, newWeight));
                    return null;
                }
                // onlyIfAbsent 形參在默認的 put 方法中為 false,以下邏輯簡單介紹
                // 如果此時有其他線程添加了相同 key 的元素
                else if (onlyIfAbsent) {
                    // 獲取到當前值,嘗試判斷讀后失效策略,更新訪問時間,并執(zhí)行讀后操作 afterRead 方法
                    V currentValue = prior.getValue();
                    if ((currentValue != null) && !hasExpired(prior, now)) {
                        if (!isComputingAsync(prior)) {
                            tryExpireAfterRead(prior, key, currentValue, expiry(), now);
                            setAccessTime(prior, now);
                        }
                        // 讀后操作,該方法在 getIfPresent 中進行講解
                        afterRead(prior, now, /* recordHit */ false);
                        return currentValue;
                    }
                }
            } else if (onlyIfAbsent) {
                // 同樣的邏輯
                V currentValue = prior.getValue();
                if ((currentValue != null) && !hasExpired(prior, now)) {
                    if (!isComputingAsync(prior)) {
                        tryExpireAfterRead(prior, key, currentValue, expiry(), now);
                        setAccessTime(prior, now);
                    }
                    afterRead(prior, now, /* recordHit */ false);
                    return currentValue;
                }
            }
        }
        // ...
    }
}

注意添加節(jié)點成功的邏輯,它會執(zhí)行 afterWrite 寫后操作方法,添加 AddTask 任務到 writeBuffer 中:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    // 寫重試最多 100 次
    static final int WRITE_BUFFER_RETRIES = 100;

    static final int WRITE_BUFFER_MIN = 4;
    static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);

    final MpscGrowableArrayQueue writeBuffer = new MpscGrowableArrayQueue(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);

    // 添加寫后 Task 到 writeBuffer 中并在合適的時機調(diào)度執(zhí)行任務
    void afterWrite(Runnable task) {
        // 最多重試添加 100 次
        for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
            if (writeBuffer.offer(task)) {
                // 寫后調(diào)度
                scheduleAfterWrite();
                return;
            }
            // 向 writeBuffer 中添加任務失敗會調(diào)度任務執(zhí)行
            scheduleDrainBuffers();
            // 自旋等待,讓出 CPU 控制權
            Thread.onSpinWait();
        }
        // ...
    }
}

writeBuffer 的類型為 MpscGrowableArrayQueue,在這里我們詳細的介紹下它。

WriteBuffer

根據(jù)它的命名 GrowableArrayQueue 可知它是一個容量可以增長的雙端隊列,前綴 MPSC 表達的含義是“多生產(chǎn)者,單消費者”,也就是說可以有多個線程向其中添加元素,但只有一個線程能從其中獲取元素。那么它是如何實現(xiàn) MPSC 的呢?接下來我們就根據(jù)源碼詳細了解一下。首先先來看一下它的類繼承關系圖及簡要說明:

wKgZPGiRqWaAKxtEAArX0SqWOsQ801.png

圖中灰色的表示抽象類,藍色為實現(xiàn)類,java.util.AbstractQueue 就不再多解釋了。我們先看看其中標記紅框的類,討論到底什么是“避免內(nèi)存?zhèn)喂蚕韱栴}”?

以 BaseMpscLinkedArrayQueuePad1 為例:

abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue {
    byte p000, p001, p002, p003, p004, p005, p006, p007;
    byte p008, p009, p010, p011, p012, p013, p014, p015;
    byte p016, p017, p018, p019, p020, p021, p022, p023;
    byte p024, p025, p026, p027, p028, p029, p030, p031;
    byte p032, p033, p034, p035, p036, p037, p038, p039;
    byte p040, p041, p042, p043, p044, p045, p046, p047;
    byte p048, p049, p050, p051, p052, p053, p054, p055;
    byte p056, p057, p058, p059, p060, p061, p062, p063;
    byte p064, p065, p066, p067, p068, p069, p070, p071;
    byte p072, p073, p074, p075, p076, p077, p078, p079;
    byte p080, p081, p082, p083, p084, p085, p086, p087;
    byte p088, p089, p090, p091, p092, p093, p094, p095;
    byte p096, p097, p098, p099, p100, p101, p102, p103;
    byte p104, p105, p106, p107, p108, p109, p110, p111;
    byte p112, p113, p114, p115, p116, p117, p118, p119;
}

這個類除了定義了 120 字節(jié)的字段外,看上去沒有做其他任何事情,實際上它為 性能提升 默默做出了貢獻,避免了內(nèi)存?zhèn)喂蚕?/strong>。CPU 中緩存行(Cache Line)的大小通常是 64 字節(jié),在類中定義 120 字節(jié)來占位,這樣便能將上下繼承關系間的字段間隔開,保證被多個線程訪問的關鍵字段距離至少跨越一個緩存行,分布在不同的緩存行中。這樣在不同的線程訪問 BaseMpscLinkedArrayQueueProducerFields 和 BaseMpscLinkedArrayQueueConsumerFields 中字段時互不影響,詳細了解原理可參考博客園 - CPU Cache與緩存行。

接下來我們看看其他抽象類的作用。BaseMpscLinkedArrayQueueProducerFields 定義生產(chǎn)者相關字段:

abstract class BaseMpscLinkedArrayQueueProducerFields extends BaseMpscLinkedArrayQueuePad1 {
    // 生產(chǎn)者操作索引(并不對應緩沖區(qū) producerBuffer 中索引位置)
    protected long producerIndex;
}

BaseMpscLinkedArrayQueueConsumerFields 負責定義消費者相關字段:

abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedArrayQueuePad2 {
    // 掩碼值,用于計算消費者實際的索引位置
    protected long consumerMask;
    // 消費者訪問這個緩沖區(qū)來獲取元素消費
    protected E[] consumerBuffer;
    // 消費者操作索引(并不對應緩沖區(qū) consumerBuffer 中索引位置)
    protected long consumerIndex;
}

BaseMpscLinkedArrayQueueColdProducerFields 中定義字段如下,該類的命名包含 Cold,表示其中字段被修改的次數(shù)會比較少:

abstract class BaseMpscLinkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueuePad3 {
    // 生產(chǎn)者可以操作的最大索引上限
    protected volatile long producerLimit;
    // 掩碼值,用于計算生產(chǎn)者在數(shù)組中實際的索引
    protected long producerMask;
    // 存儲生產(chǎn)者生產(chǎn)的元素
    protected E[] producerBuffer;
}

現(xiàn)在關鍵字段我們已經(jīng)介紹完了,接下來看一下創(chuàng)建 MpscGrowableArrayQueue 的邏輯,執(zhí)行它的構(gòu)造方法時會為我們剛剛提到的字段進行賦值:

class MpscGrowableArrayQueue extends MpscChunkedArrayQueue {

    MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) {
        // 調(diào)用父類的構(gòu)造方法
        super(initialCapacity, maxCapacity);
    }
}

abstract class MpscChunkedArrayQueue extends MpscChunkedArrayQueueColdProducerFields {
    // 省略字節(jié)占位字段...
    byte p119;

    MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) {
        // 調(diào)用父類的構(gòu)造方法
        super(initialCapacity, maxCapacity);
    }

}

abstract class MpscChunkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueue {
    protected final long maxQueueCapacity;

    MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) {
        // 調(diào)用父類的構(gòu)造方法
        super(initialCapacity);
        if (maxCapacity < 4) {
            throw new IllegalArgumentException("Max capacity must be 4 or more");
        }
        // 保證了最大值最少比初始值大 2 倍
        if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) {
            throw new IllegalArgumentException(
                    "Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)");
        }
        // 最大容量也為 2的n次冪
        maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity))  extends BaseMpscLinkedArrayQueueColdProducerFields {

    BaseMpscLinkedArrayQueue(final int initialCapacity) {
        if (initialCapacity < 2) {
            throw new IllegalArgumentException("Initial capacity must be 2 or more");
        }

        // 初始化緩沖區(qū)大小為數(shù)值最接近的 2 的 n 次冪
        int p2capacity = ceilingPowerOfTwo(initialCapacity);
        // 掩碼值,-1L 使其低位均為 1,左移 1 位則最低位為 0,eg: 00000110,注意該值會被生產(chǎn)者和消費者掩碼值共同賦值
        long mask = (p2capacity - 1L) 

現(xiàn)在 MpscGrowableArrayQueue 的構(gòu)建已經(jīng)看完了,了解了其中關鍵字段的賦值,現(xiàn)在我們就需要看它是如何實現(xiàn) MPSC 的?!岸嗌a(chǎn)者”也就意味著會有多個線程向其中添加元素,既然是多線程就需要重點關注它是如何在多線程間完成協(xié)同的。添加操作對應了 BaseMpscLinkedArrayQueue#offer 方法,它的實現(xiàn)如下:

abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {

    private static final Object JUMP = new Object();

    @Override
    @SuppressWarnings("MissingDefault")
    public boolean offer(final E e) {
        if (e == null) {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer;
        long pIndex;

        while (true) {
            // 生產(chǎn)者最大索引(生產(chǎn)者掩碼值),獲取 BaseMpscLinkedArrayQueueColdProducerFields 中定義的該字段
            long producerLimit = lvProducerLimit();
            // 生產(chǎn)者當前索引,初始值為 0,BaseMpscLinkedArrayQueueProducerFields 中字段 
            pIndex = lvProducerIndex(this);
            // producerIndex 最低位用來表示擴容(索引生產(chǎn)者索引 producerIndex 并不對應緩沖區(qū)中實際的索引)
            // 低位為 1 表示正在擴容,自旋等待直到擴容完成(表示只有一個線程操作擴容)
            if ((pIndex & 1) == 1) {
                continue;
            }

            // 掩碼值和buffer可能在擴容中被改變,每次循環(huán)使用最新值
            mask = this.producerMask;
            buffer = this.producerBuffer;

            // 檢查是否需要擴容
            if (producerLimit <= pIndex) {
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result) {
                    case 0:
                        break;
                    case 1:
                        continue;
                    case 2:
                        return false;
                    case 3:
                        resize(mask, buffer, pIndex, e);
                        return true;
                }
            }

            // CAS 操作更新生產(chǎn)者索引,注意這里是 +2,更新成功結(jié)束循環(huán)
            if (casProducerIndex(this, pIndex, pIndex + 2)) {
                break;
            }
        }
        // 計算該元素在 buffer 中的實際偏移量,并將其添加到緩沖區(qū)中
        final long offset = modifiedCalcElementOffset(pIndex, mask);
        soElement(buffer, offset, e);
        return true;
    }

    // 沒有將 resize 邏輯封裝在該方法中,而是由該方法判斷是否需要擴容
    private int offerSlowPath(long mask, long pIndex, long producerLimit) {
        int result;
        // 獲取消費者索引 BaseMpscLinkedArrayQueueConsumerFields 類中
        final long cIndex = lvConsumerIndex(this);
        // 通過掩碼值計算當前緩沖區(qū)容量
        long bufferCapacity = getCurrentBufferCapacity(mask);
        result = 0;
        // 如果隊列還有空間
        if (cIndex + bufferCapacity > pIndex) {
            // 嘗試更新生產(chǎn)者最大限制,更新失敗則返回 1 重試
            if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {
                result = 1;
            }
        }
        // 如果隊列已滿且無法擴展
        else if (availableInQueue(pIndex, cIndex) <= 0) {
            result = 2;
        }
        // 更新 producerIndex 最低位為 1,成功則進行擴容,否則重試
        else if (casProducerIndex(this, pIndex, pIndex + 1)) {
            result = 3;
        } else {
            result = 1;
        }
        return result;
    }

    private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {
        // 計算新緩沖區(qū)大小并創(chuàng)建,2 * (buffer.length - 1) + 1
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer = allocate(newBufferLength);

        // 更新緩沖區(qū)引用為新的緩沖區(qū)
        producerBuffer = newBuffer;
        // 更新新的掩碼
        final int newMask = (newBufferLength - 2) > 1;
    }
}

可見,在這個過程中它并沒有限制操作線程數(shù)量,也沒有使用加鎖的同步機制。它通過保證 可見性,并使用 自旋鎖結(jié)合 CAS 操作 更新生產(chǎn)者索引值,因為該操作是原子的,同時只有一個線程能更新獲取索引值成功,更新失敗的線程會自旋重試,這樣便允許多線程同時添加元素,可見性保證和CAS操作源碼如下:

abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {

    static final VarHandle P_INDEX = pIndexLookup.findVarHandle(
            BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex", long.class);
    
    // volatile 可見性保證
    static long lvProducerIndex(BaseMpscLinkedArrayQueue self) {
        return (long) P_INDEX.getVolatile(self);
    }
    
    // CAS 操作
    static boolean casProducerIndex(BaseMpscLinkedArrayQueue self, long expect, long newValue) {
        return P_INDEX.compareAndSet(self, expect, newValue);
    }
}

保證可見性(內(nèi)存操作對其他線程可見)的原理是 內(nèi)存屏障,除了保證可見性以外,內(nèi)存屏障還能夠 防止重排序(確保在內(nèi)存屏障前后的內(nèi)存操作不會被重排序,從而保證程序的正確性)。到這里,生產(chǎn)者添加元素的邏輯我們已經(jīng)分析完了,接下來我們需要繼續(xù)看一下消費者獲取元素的邏輯,它對應了 BaseMpscLinkedArrayQueue#poll 方法,同樣地,在這過程中需要關注“在這個方法中有沒有限制單一線程執(zhí)行”,以此實現(xiàn)單消費者呢:

abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {
    
    private static final Object JUMP = new Object();
    
    public E poll() {
        // 讀取消費者相關字段 BaseMpscLinkedArrayQueueConsumerFields 類
        final E[] buffer = consumerBuffer;
        final long index = consumerIndex;
        final long mask = consumerMask;

        // 根據(jù)消費索引,計算出元素在消費者緩沖區(qū)中實際的位置
        final long offset = modifiedCalcElementOffset(index, mask);
        // 讀取該元素(volatile 可見性讀?。?        Object e = lvElement(buffer, offset);
        
        // 如果為空
        if (e == null) {
            // 比較生產(chǎn)者索引,如果兩個索引不相等,那么證明兩索引間存在距離表示還有元素能夠被消費
            if (index != lvProducerIndex(this)) {
                // 自旋讀取元素,直到讀到元素
                do {
                    e = lvElement(buffer, offset);
                } while (e == null);
            } else {
                // 索引相等證明確實是空隊列
                return null;
            }
        }
        if (e == JUMP) {
            // 獲取到新緩沖區(qū)
            final E[] nextBuffer = getNextBuffer(buffer, mask);
            // 在新緩沖區(qū)中獲取到對應元素
            return newBufferPoll(nextBuffer, index);
        }
        // 清除當前索引的元素,表示該元素已經(jīng)被消費
        soElement(buffer, offset, null);
        // 更新消費者索引,這里也是 +2,它并不表示實際的在緩沖區(qū)的索引
        soConsumerIndex(this, index + 2);
        return (E) e;
    }

    private E[] getNextBuffer(final E[] buffer, final long mask) {
        // 如果已經(jīng)發(fā)生擴容,此時 consumerMask 仍然對應的是擴容前的 mask
        // 此處與生產(chǎn)者操作擴容時拼接新舊緩沖區(qū)調(diào)用的是一樣的方法,這樣便能夠獲取到新緩沖區(qū)的偏移量
        final long nextArrayOffset = nextArrayOffset(mask);
        // 獲取到新緩沖區(qū),因為在擴容操作時已經(jīng)將新緩沖區(qū)鏈接到舊緩沖區(qū)上了
        final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset);
        // 將舊緩沖區(qū)中新緩沖區(qū)位置設置為 null 表示舊緩沖區(qū)中已經(jīng)沒有任何元素需要被消費了,也不再需要被引用了(能被垃圾回收了)
        soElement(buffer, nextArrayOffset, null);
        return nextBuffer;
    }

    private long nextArrayOffset(final long mask) {
        return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);
    }

    private E newBufferPoll(E[] nextBuffer, final long index) {
        // 計算出消費者操作索引在新緩沖區(qū)中對應的實際位置
        final long offsetInNew = newBufferAndOffset(nextBuffer, index);
        // 在新緩沖區(qū)中獲取到對應元素
        final E n = lvElement(nextBuffer, offsetInNew);
        if (n == null) {
            throw new IllegalStateException("new buffer must have at least one element");
        }
        // 清除當前索引的元素,表示該元素已經(jīng)被消費
        soElement(nextBuffer, offsetInNew, null);
        // 更新消費者索引
        soConsumerIndex(this, index + 2);
        return n;
    }

    private long newBufferAndOffset(E[] nextBuffer, final long index) {
        // 將消費者緩沖區(qū)引用和掩碼值更新
        consumerBuffer = nextBuffer;
        consumerMask = (nextBuffer.length - 2L) > 1;
    }
    
    static  E lvElement(E[] buffer, long offset) {
        return (E) REF_ARRAY.getVolatile(buffer, (int) offset);
    }
}

可以發(fā)現(xiàn)在該方法中并沒有限制單一線程執(zhí)行,所以理論上這個方法可能被多個線程調(diào)用,那么它又為什么被稱為 MPSC 呢?在這個方法中的一段注釋值得細心體會:

This implementation is correct for single consumer thread use only.
此實現(xiàn)僅適用于單消費者線程使用

所以調(diào)用該方法時開發(fā)者本身需要保證單線程調(diào)用而并不是在實現(xiàn)中控制。

到這里 MpscGrowableArrayQueue 中核心的邏輯已經(jīng)講解完了,現(xiàn)在我們回過頭來再看一下隊列擴容前后生產(chǎn)者和消費者是如何協(xié)同的?在擴容前,consumerBuffer 和 producerBuffer 引用的是同一個緩沖區(qū)對象。如果發(fā)生擴容,那么生產(chǎn)者會創(chuàng)建一個新的緩沖區(qū),并將 producerBuffer 引用指向它,此時它做了一個 非常巧妙 的操作,將 新緩沖區(qū)依然鏈接到舊緩沖區(qū) 上,并將觸發(fā)擴容的元素對應的舊緩沖區(qū)的索引處標記為 JUMP,表示這及之后的元素已經(jīng)都在新緩沖區(qū)中。此時,消費者依然會在舊緩沖區(qū)中慢慢地消費,直到遇到 JUMP 標志位,消費者就知道需要到新緩沖區(qū)中取獲取元素了。因為之前生產(chǎn)者在擴容時對新舊緩沖區(qū)進行鏈接,所以消費者能夠通過舊緩沖區(qū)獲取到新緩沖區(qū)的引用,并變更 consumerBuffer 的引用和 consumerMask 掩碼值,接下來的消費過程便和擴容前沒有差別了。

scheduleAfterWrite

現(xiàn)在我們再回到 put 方法的邏輯中,如果向 WriterBuffer 中添加元素成功,則會調(diào)用 scheduleAfterWrite 方法,調(diào)度任務的執(zhí)行:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ReentrantLock evictionLock = new ReentrantLock();
    // 默認為 ForkJoinPool.commonPool()
    final Executor executor;
    // 該任務在創(chuàng)建緩存時已經(jīng)完成初始化
    final PerformCleanupTask drainBuffersTask;
    
    // 根據(jù)狀態(tài)的變化來調(diào)度執(zhí)行任務
    void scheduleAfterWrite() {
        // 獲取當前 drainStatus,drain 譯為排空,耗盡
        int drainStatus = drainStatusOpaque();
        for (; ; ) {
            // 這里的狀態(tài)機變更需要關注下
            switch (drainStatus) {
                // IDLE 表示當前無任務可做
                case IDLE:
                    // CAS 更新狀態(tài)為 REQUIRED
                    casDrainStatus(IDLE, REQUIRED);
                    // 調(diào)度任務執(zhí)行
                    scheduleDrainBuffers();
                    return;
                // REQUIRED 表示當前有任務需要執(zhí)行
                case REQUIRED:
                    // 調(diào)度任務執(zhí)行
                    scheduleDrainBuffers();
                    return;
                // PROCESSING_TO_IDLE 表示當前任務處理完成后會變成 IDLE 狀態(tài)
                case PROCESSING_TO_IDLE:
                    // 又來了新的任務,則 CAS 操作將它更新為 PROCESSING_TO_REQUIRED 狀態(tài)
                    if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
                        return;
                    }
                    drainStatus = drainStatusAcquire();
                    continue;
                    // PROCESSING_TO_REQUIRED 表示正在處理任務,處理完任務后還有任務需要處理
                case PROCESSING_TO_REQUIRED:
                    return;
                default:
                    throw new IllegalStateException("Invalid drain status: " + drainStatus);
            }
        }
    }

    // 調(diào)度執(zhí)行緩沖區(qū)中的任務
    void scheduleDrainBuffers() {
        // 如果狀態(tài)表示正在有任務處理則返回
        if (drainStatusOpaque() >= PROCESSING_TO_IDLE) {
            return;
        }
        // 注意這里要獲取同步鎖 evictionLock
        if (evictionLock.tryLock()) {
            try {
                // 獲取鎖后再次校驗當前處理狀態(tài)
                int drainStatus = drainStatusOpaque();
                if (drainStatus >= PROCESSING_TO_IDLE) {
                    return;
                }
                // 更新狀態(tài)為 PROCESSING_TO_IDLE
                setDrainStatusRelease(PROCESSING_TO_IDLE);
                // 同步機制保證任何時刻只能有一個線程能夠提交任務
                executor.execute(drainBuffersTask);
            } catch (Throwable t) {
                logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
                maintenance(/* ignored */ null);
            } finally {
                evictionLock.unlock();
            }
        }
    }

}

寫后調(diào)度處理任務(scheduleAfterWrite)會根據(jù)狀態(tài)選擇性執(zhí)行 scheduleDrainBuffers 方法,執(zhí)行該方法時通過同步鎖 evictionLock 保證同時只有一個線程能提交 PerformCleanupTask 任務。這個任務在創(chuàng)建緩存時已經(jīng)被初始化完成了,每次提交任務都會被復用,接下來我們看一下這個任務的具體實現(xiàn):

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    // 可重用的任務,用于執(zhí)行 maintenance 方法,避免了使用 ForkJoinPool 來包裝
    static final class PerformCleanupTask extends ForkJoinTask implements Runnable {
        private static final long serialVersionUID = 1L;

        final WeakReference> reference;

        PerformCleanupTask(BoundedLocalCache cache) {
            reference = new WeakReference>(cache);
        }

        @Override
        public boolean exec() {
            try {
                run();
            } catch (Throwable t) {
                logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", t);
            }

            // Indicates that the task has not completed to allow subsequent submissions to execute
            return false;
        }

        @Override
        public void run() {
            // 獲取到緩存對象
            BoundedLocalCache cache = reference.get();
            if (cache != null) {
                cache.performCleanUp(null);
            }
        }
        // ...
    }
}

它的實現(xiàn)非常簡單,其中 reference 字段在調(diào)用構(gòu)造方法時被賦值,引用的是緩存對象本身。當任務被執(zhí)行時,調(diào)用的是 BoundedLocalCache#performCleanUp 方法:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ReentrantLock evictionLock = new ReentrantLock();
    
    // 執(zhí)行該任務時,也要獲取同步鎖,表示任務只能由一個線程來執(zhí)行
    void performCleanUp(@Nullable Runnable task) {
        evictionLock.lock();
        try {
            // 執(zhí)行維護任務
            maintenance(task);
        } finally {
            evictionLock.unlock();
        }
        rescheduleCleanUpIfIncomplete();
    }

    @GuardedBy("evictionLock")
    void maintenance(@Nullable Runnable task) {
        // 更新狀態(tài)為執(zhí)行中
        setDrainStatusRelease(PROCESSING_TO_IDLE);

        try {
            // 處理讀緩沖區(qū)中的任務
            drainReadBuffer();

            // 處理寫緩沖區(qū)中的任務
            drainWriteBuffer();
            if (task != null) {
                task.run();
            }

            // 處理 key 和 value 的引用
            drainKeyReferences();
            drainValueReferences();

            // 過期和驅(qū)逐策略
            expireEntries();
            evictEntries();

            // “增值” 操作,后續(xù)重點講
            climb();
        } finally {
            // 狀態(tài)不是 PROCESSING_TO_IDLE 或者無法 CAS 更新為 IDLE 狀態(tài)的話,需要更新狀態(tài)為 REQUIRED,該狀態(tài)會再次執(zhí)行維護任務
            if ((drainStatusOpaque() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
                setDrainStatusOpaque(REQUIRED);
            }
        }
    }
}

注意在執(zhí)行 performCleanUp 方法時,也需要獲取到同步鎖 evictionLock,那么任務的提交和任務的執(zhí)行也是互斥的。這個執(zhí)行的核心邏輯在 maintenance “維護”方法中,注意這個方法被標記了注解 @GuardedBy("evictionLock"),源碼中還有多個方法也標記了該注解,執(zhí)行這些方法時都要獲取同步鎖,這也是在提醒我們這些方法同時只有由一條線程被執(zhí)行。因為目前關注的是 put 方法,所以重點先看維護方法中 drainWriteBuffer 方法處理寫緩沖區(qū)中任務的邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final int NCPU = Runtime.getRuntime().availableProcessors();

    static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);

    final MpscGrowableArrayQueue writeBuffer;

    @GuardedBy("evictionLock")
    void drainWriteBuffer() {
        // 最大循環(huán)次數(shù)為 writeBuffer 最大容量,直至彈出元素為 null
        for (int i = 0; i <= WRITE_BUFFER_MAX; i++) {
            Runnable task = writeBuffer.poll();
            if (task == null) {
                return;
            }
            task.run();
        }
        // 更新狀態(tài)為 PROCESSING_TO_REQUIRED
        setDrainStatusOpaque(PROCESSING_TO_REQUIRED);
    }
}

執(zhí)行邏輯非常簡單,在獲取到同步鎖之后,在 WriteBuffer 中獲取要被執(zhí)行的任務并執(zhí)行。在這里我們能發(fā)現(xiàn)“SC 單消費者”的實現(xiàn)使用 同步鎖的機制保證同時只能有一個消費者消費緩沖區(qū)中的任務。在上文中我們已經(jīng)知道,調(diào)用 put 方法時向緩沖區(qū) WriteBuffer 中添加的任務為 AddTask,下面我們看一下該任務的實現(xiàn):

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;

    final class AddTask implements Runnable {
        final Node node;
        // 節(jié)點權重
        final int weight;

        AddTask(Node node, int weight) {
            this.weight = weight;
            this.node = node;
        }

        @Override
        @GuardedBy("evictionLock")
        @SuppressWarnings("FutureReturnValueIgnored")
        public void run() {
            // 是否指定了驅(qū)逐策略
            if (evicts()) {
                // 更新緩存權重和窗口區(qū)權重
                setWeightedSize(weightedSize() + weight);
                setWindowWeightedSize(windowWeightedSize() + weight);
                // 更新節(jié)點的 policyWeight,該字段只有在自定了權重計算規(guī)則時才有效
                // 否則像只定義了固定容量的驅(qū)逐策略,使用默認元素權重為 1 是不需要關注該字段的
                node.setPolicyWeight(node.getPolicyWeight() + weight);

                // 檢測當前總權重是否超過一半的最大容量
                long maximum = maximum();
                if (weightedSize() >= (maximum >>> 1)) {
                    // 如果超過最大容量
                    if (weightedSize() > MAXIMUM_CAPACITY) {
                        // 執(zhí)行驅(qū)逐操作
                        evictEntries();
                    } else {
                        // 延遲加載頻率草圖 frequencySketch 數(shù)據(jù)結(jié)構(gòu),用于統(tǒng)計元素訪問頻率
                        long capacity = isWeighted() ? data.mappingCount() : maximum;
                        frequencySketch().ensureCapacity(capacity);
                    }
                }

                // 更新頻率統(tǒng)計信息
                K key = node.getKey();
                if (key != null) {
                    // 因為頻率草圖數(shù)據(jù)結(jié)構(gòu)具有延遲加載機制(權重超過半數(shù))
                    // 所以實際上在元素權重還未過半未完成初始化時,調(diào)用 increment 是沒有作用的
                    frequencySketch().increment(key);
                }

                // 增加未命中樣本數(shù)
                setMissesInSample(missesInSample() + 1);
            }

            // 同步檢測節(jié)點是否還有效
            boolean isAlive;
            synchronized (node) {
                isAlive = node.isAlive();
            }
            if (isAlive) {
                // 寫后過期策略
                if (expiresAfterWrite()) {
                    writeOrderDeque().offerLast(node);
                }
                // 過期策略
                if (expiresVariable()) {
                    timerWheel().schedule(node);
                }
                // 驅(qū)逐策略
                if (evicts()) {
                    // 如果權重比配置的最大權重大
                    if (weight > maximum()) {
                        // 執(zhí)行固定權重(RemovalCause.SIZE)的驅(qū)逐策略
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                    // 如果權重超過窗口區(qū)最大權重,則將其放在窗口區(qū)頭節(jié)點
                    else if (weight > windowMaximum()) {
                        accessOrderWindowDeque().offerFirst(node);
                    }
                    // 否則放在窗口區(qū)尾節(jié)點
                    else {
                        accessOrderWindowDeque().offerLast(node);
                    }
                }
                // 訪問后過期策略
                else if (expiresAfterAccess()) {
                    accessOrderWindowDeque().offerLast(node);
                }
            }

            // 處理異步計算
            if (isComputingAsync(node)) {
                synchronized (node) {
                    if (!Async.isReady((CompletableFuture) node.getValue())) {
                        long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;
                        setVariableTime(node, expirationTime);
                        setAccessTime(node, expirationTime);
                        setWriteTime(node, expirationTime);
                    }
                }
            }
        }
    }
}

根據(jù)注釋很容易理解該方法的作用,因為我們目前對緩存只定義了固定容量的驅(qū)逐策略,所以我們需要在看一下 evictEntry 方法:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ConcurrentHashMap> data;
    
    @GuardedBy("evictionLock")
    @SuppressWarnings({"GuardedByChecker", "NullAway", "PMD.CollapsibleIfStatements"})
    boolean evictEntry(Node node, RemovalCause cause, long now) {
        K key = node.getKey();
        @SuppressWarnings("unchecked")
        V[] value = (V[]) new Object[1];
        boolean[] removed = new boolean[1];
        boolean[] resurrect = new boolean[1];
        Object keyReference = node.getKeyReference();
        RemovalCause[] actualCause = new RemovalCause[1];

        data.computeIfPresent(keyReference, (k, n) -> {
            if (n != node) {
                return n;
            }
            synchronized (n) {
                value[0] = n.getValue();

                // key 或 value 為 null,這種情況下可能使用了 Caffeine.weakKeys, Caffeine.weakValues, or Caffeine.softValues
                // 導致被垃圾回收了
                if ((key == null) || (value[0] == null)) {
                    // 標記實際失效原因為垃圾回收 
                    actualCause[0] = RemovalCause.COLLECTED;
                }
                // 如果原因為垃圾回收,記錄 resurrect 復活標記為 true
                else if (cause == RemovalCause.COLLECTED) {
                    resurrect[0] = true;
                    return n;
                }
                // 否則記錄入?yún)⒅械脑?                else {
                    actualCause[0] = cause;
                }

                // 過期驅(qū)逐策略判斷
                if (actualCause[0] == RemovalCause.EXPIRED) {
                    boolean expired = false;
                    if (expiresAfterAccess()) {
                        expired |= ((now - n.getAccessTime()) >= expiresAfterAccessNanos());
                    }
                    if (expiresAfterWrite()) {
                        expired |= ((now - n.getWriteTime()) >= expiresAfterWriteNanos());
                    }
                    if (expiresVariable()) {
                        expired |= (n.getVariableTime() <= now);
                    }
                    if (!expired) {
                        resurrect[0] = true;
                        return n;
                    }
                }
                // 固定容量驅(qū)逐策略
                else if (actualCause[0] == RemovalCause.SIZE) {
                    int weight = node.getWeight();
                    if (weight == 0) {
                        resurrect[0] = true;
                        return n;
                    }
                }

                // 通知驅(qū)逐策略監(jiān)聽器,調(diào)用它的方法
                notifyEviction(key, value[0], actualCause[0]);
                // 將該 key 對應的刷新策略失效
                discardRefresh(keyReference);
                // 標記該節(jié)點被驅(qū)逐
                removed[0] = true;
                // 退休準備被垃圾回收
                node.retire();
            }
            return null;
        });

        // 如果復活標記為 true 那么不被移除
        if (resurrect[0]) {
            return false;
        }

        // 節(jié)點已經(jīng)要被驅(qū)逐
        // 如果在窗口區(qū),那么直接從窗口區(qū)移除
        if (node.inWindow() && (evicts() || expiresAfterAccess())) {
            accessOrderWindowDeque().remove(node);
        }
        // 如果沒在窗口區(qū)
        else if (evicts()) {
            // 在試用區(qū)直接在試用區(qū)移除
            if (node.inMainProbation()) {
                accessOrderProbationDeque().remove(node);
            }
            // 在保護區(qū)則直接從保護區(qū)移除
            else {
                accessOrderProtectedDeque().remove(node);
            }
        }
        // 將寫后失效和時間輪中關于該節(jié)點的元素移除
        if (expiresAfterWrite()) {
            writeOrderDeque().remove(node);
        } else if (expiresVariable()) {
            timerWheel().deschedule(node);
        }

        // 同步機制將 node 置為 dead
        synchronized (node) {
            logIfAlive(node);
            makeDead(node);
        }

        if (removed[0]) {
            // 節(jié)點被移除監(jiān)控計數(shù)和節(jié)點移除通知回調(diào)
            statsCounter().recordEviction(node.getWeight(), actualCause[0]);
            notifyRemoval(key, value[0], actualCause[0]);
        }

        return true;
    }
}

該方法比較簡單,是將節(jié)點進行驅(qū)逐的邏輯,在后文中它會被多次復用,需要留一個印象。回到 AddTask 任務的邏輯中,當被添加的元素權重超過最大權重限制時會被直接移除。這種特殊情況試用于指定了權重計算策略的緩存,如果只指定了固定容量,元素權重默認為 1,所以不會直接超過最大緩存數(shù)量限制。

現(xiàn)在我們已經(jīng)將 put 方法中向緩存中添加元素的邏輯介紹完了,接下來需要關注 put 方法中對已存在的相同 key 值元素的處理邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    static final int MAX_PUT_SPIN_WAIT_ATTEMPTS = 1024 - 1;

    static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1);
    
    final ConcurrentHashMap> data;
    
    @Nullable
    V put(K key, V value, Expiry expiry, boolean onlyIfAbsent) {
        requireNonNull(key);
        requireNonNull(value);

        Node node = null;
        long now = expirationTicker().read();
        int newWeight = weigher.weigh(key, value);
        Object lookupKey = nodeFactory.newLookupKey(key);
        for (int attempts = 1; ; attempts++) {
            Node prior = data.get(lookupKey);
            if (prior == null) {
                // ... 
            }

            // 元素被讀到之后可能已經(jīng)被驅(qū)逐了
            if (!prior.isAlive()) {
                // 自旋嘗試重新從 ConcurrentHashMap 中獲取,再獲取時如果為 null 則執(zhí)行新增邏輯
                if ((attempts & MAX_PUT_SPIN_WAIT_ATTEMPTS) != 0) {
                    Thread.onSpinWait();
                    continue;
                }
                // 如果自旋嘗試后元素仍未被刪除,校驗元素是否處于存活狀態(tài)
                // 如果處于非存活狀態(tài),那么可能這個元素已經(jīng)被破壞,無法被移除,拋出異常
                data.computeIfPresent(lookupKey, (k, n) -> {
                    requireIsAlive(key, n);
                    return n;
                });
                continue;
            }

            V oldValue;
            // 新的過期時間
            long varTime;
            int oldWeight;
            boolean expired = false;
            boolean mayUpdate = true;
            boolean exceedsTolerance = false;
            // 為元素加同步鎖
            synchronized (prior) {
                // 如果此時元素已經(jīng)失效了,那么需要重新循環(huán)
                if (!prior.isAlive()) {
                    continue;
                }
                oldValue = prior.getValue();
                oldWeight = prior.getWeight();
                // oldValue 為 null 證明它被垃圾回收器回收了
                if (oldValue == null) {
                    // 記錄元素創(chuàng)建后的過期時間
                    varTime = expireAfterCreate(key, value, expiry, now);
                    // 驅(qū)逐監(jiān)聽器回調(diào)
                    notifyEviction(key, null, RemovalCause.COLLECTED);
                }
                // 如果元素已經(jīng)過期了
                else if (hasExpired(prior, now)) {
                    // 標記過期標志為 true
                    expired = true;
                    // 記錄元素創(chuàng)建后的過期時間并回調(diào)驅(qū)逐監(jiān)聽器
                    varTime = expireAftexpireAfterCreateerCreate(key, value, expiry, now);
                    notifyEviction(key, oldValue, RemovalCause.EXPIRED);
                }
                // onlyInAbsent 為 true 時不會對已存在 key 的值進行修改
                else if (onlyIfAbsent) {
                    mayUpdate = false;
                    // 記錄元素讀后過期時間
                    varTime = expireAfterRead(prior, key, value, expiry, now);
                } else {
                    // 記錄元素修改后過期時間
                    varTime = expireAfterUpdate(prior, key, value, expiry, now);
                }

                // 需要修改原有 key 的 value 值
                if (mayUpdate) {
                    exceedsTolerance =
                            // 配置了寫后過期策略且已經(jīng)超過寫后時間的容忍范圍
                            (expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
                                    // 或者配置了可變時間過期策略同樣判斷是否超過時間的容忍范圍
                                    || (expiresVariable() && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);

                    // 更新值,更新權重,更新寫時間
                    prior.setValue(value, valueReferenceQueue());
                    prior.setWeight(newWeight);
                    setWriteTime(prior, now);

                    // 寫后刷新策略失效
                    discardRefresh(prior.getKeyReference());
                }

                // 更新過期時間
                setVariableTime(prior, varTime);
                // 更新訪問時間
                setAccessTime(prior, now);
            }

            // 根據(jù)不同的情況回調(diào)不同的監(jiān)聽器
            if (expired) {
                notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
            } else if (oldValue == null) {
                notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
            } else if (mayUpdate) {
                notifyOnReplace(key, oldValue, value);
            }

            // 計算寫后權重變化
            int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
            // 如果 oldValue 已經(jīng)被回收 或 權重修改前后發(fā)生變更 或 已經(jīng)過期,添加更新任務
            if ((oldValue == null) || (weightedDifference != 0) || expired) {
                afterWrite(new UpdateTask(prior, weightedDifference));
            }
            // 如果超過了時間容忍范圍,添加更新任務
            else if (!onlyIfAbsent && exceedsTolerance) {
                afterWrite(new UpdateTask(prior, weightedDifference));
            } else {
                // 沒有超過時間容忍范圍,更新寫時間
                if (mayUpdate) {
                    setWriteTime(prior, now);
                }
                // 處理讀后操作
                afterRead(prior, now, /* recordHit */ false);
            }

            return expired ? null : oldValue;
        }
    }
}

對于已有元素的變更,會對節(jié)點添加同步鎖,更新它的權重等一系列變量,如果超過 1s 的時間容忍范圍,則會添加 UpdateTask 更新任務,至于處理讀后操作 afterRead 在讀方法中再去介紹。接下來我們需要重新再看一下 afterWrite 方法,其中有部分我們在上文中沒有介紹的邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final ReentrantLock evictionLock;
    
    void afterWrite(Runnable task) {
        // 這段邏輯我們在看 AddTask 的邏輯時已經(jīng)看過了,所以略過
        for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
            if (writeBuffer.offer(task)) {
                scheduleAfterWrite();
                return;
            }
            scheduleDrainBuffers();
            Thread.onSpinWait();
        }

        // 以下邏輯用于解決在重試了 100 次后仍然寫入失敗的問題,它會嘗試獲取 evictionLock 同步鎖
        // 直接同步執(zhí)行“維護”方法并執(zhí)行當前任務,但是它并無法解決某個寫入操作執(zhí)行時間很長的問題
        // 發(fā)生這種情況的原因可能是由于執(zhí)行器的所有線程都很忙(可能是寫入此緩存),寫入速率大大超過了消耗速率,優(yōu)先級反轉(zhuǎn),或者執(zhí)行器默默地丟棄了維護任務
        lock();
        try {
            maintenance(task);
        } catch (RuntimeException e) {
            logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
        } finally {
            evictionLock.unlock();
        }
        // 重新調(diào)度異步維護任務,確保維護操作能及時執(zhí)行
        rescheduleCleanUpIfIncomplete();
    }

    void lock() {
        long remainingNanos = WARN_AFTER_LOCK_WAIT_NANOS;
        long end = System.nanoTime() + remainingNanos;
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    if (evictionLock.tryLock(remainingNanos, TimeUnit.NANOSECONDS)) {
                        return;
                    }
                    logger.log(Level.WARNING, "The cache is experiencing excessive wait times for acquiring "
                            + "the eviction lock. This may indicate that a long-running computation has halted "
                            + "eviction when trying to remove the victim entry. Consider using AsyncCache to "
                            + "decouple the computation from the map operation.", new TimeoutException());
                    evictionLock.lock();
                    return;
                } catch (InterruptedException e) {
                    remainingNanos = end - System.nanoTime();
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 調(diào)用同步的維護方法時,可能發(fā)生獲取鎖超時,那么再重新開啟一個異步維護調(diào)度
    void rescheduleCleanUpIfIncomplete() {
        // 校驗是否有任務需要被執(zhí)行
        if (drainStatusOpaque() != REQUIRED) {
            return;
        }
        
        // 默認線程池調(diào)度任務執(zhí)行,這個方法我們在上文中已經(jīng)詳細介紹過
        if (executor == ForkJoinPool.commonPool()) {
            scheduleDrainBuffers();
            return;
        }
        
        // 如果自定義了線程池,那么會使用自定義的線程池進行處理
        var pacer = pacer();
        if ((pacer != null) && !pacer.isScheduled() && evictionLock.tryLock()) {
            try {
                if ((drainStatusOpaque() == REQUIRED) && !pacer.isScheduled()) {
                    pacer.schedule(executor, drainBuffersTask, expirationTicker().read(), Pacer.TOLERANCE);
                }
            } finally {
                evictionLock.unlock();
            }
        }
    }
}

寫后操作除了在添加任務到緩沖區(qū)成功后會執(zhí)行維護方法,添加失?。ㄗC明寫入操作非常頻繁)依然會嘗試同步執(zhí)行維護方法和發(fā)起異步維護,用于保證緩存中的任務能夠被及時執(zhí)行,使緩存中元素都處于“預期”狀態(tài)中。接下來我們在看一下 UpdateTask 更新任務的邏輯:

abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {

    final class UpdateTask implements Runnable {
        final int weightDifference;
        final Node node;

        public UpdateTask(Node node, int weightDifference) {
            this.weightDifference = weightDifference;
            this.node = node;
        }

        @Override
        @GuardedBy("evictionLock")
        public void run() {
            // 寫后過期和自定義過期邏輯
            if (expiresAfterWrite()) {
                reorder(writeOrderDeque(), node);
            } else if (expiresVariable()) {
                timerWheel().reschedule(node);
            }
            // 指定了驅(qū)逐策略
            if (evicts()) {
                // 變更節(jié)點權重
                int oldWeightedSize = node.getPolicyWeight();
                node.setPolicyWeight(oldWeightedSize + weightDifference);
                // 如果是窗口區(qū)節(jié)點
                if (node.inWindow()) {
                    // 更新窗口區(qū)權重
                    setWindowWeightedSize(windowWeightedSize() + weightDifference);
                    // 節(jié)點權重超過最大權重限制,直接驅(qū)逐
                    if (node.getPolicyWeight() > maximum()) {
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                    // 節(jié)點權重比窗口區(qū)最大值小
                    else if (node.getPolicyWeight() <= windowMaximum()) {
                        onAccess(node);
                    }
                    // 窗口區(qū)包含該節(jié)點且該節(jié)點的權重大于窗口最大權重,則放到頭節(jié)點
                    else if (accessOrderWindowDeque().contains(node)) {
                        accessOrderWindowDeque().moveToFront(node);
                    }
                }
                // 如果是試用區(qū)節(jié)點
                else if (node.inMainProbation()) {
                    // 節(jié)點權重比最大權重限制小
                    if (node.getPolicyWeight() <= maximum()) {
                        onAccess(node);
                    }
                    // 否則將該節(jié)點驅(qū)逐
                    else {
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                }
                // 如果是保護區(qū)節(jié)點
                else if (node.inMainProtected()) {
                    // 更新保護區(qū)權重
                    setMainProtectedWeightedSize(mainProtectedWeightedSize() + weightDifference);
                    // 同樣的邏輯
                    if (node.getPolicyWeight() <= maximum()) {
                        onAccess(node);
                    } else {
                        evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
                    }
                }

                // 更新緩存權重大小
                setWeightedSize(weightedSize() + weightDifference);
                // 更新完成后超過最大權重限制執(zhí)行驅(qū)逐操作
                if (weightedSize() > MAXIMUM_CAPACITY) {
                    evictEntries();
                }
            }
            // 配置了訪問后過期
            else if (expiresAfterAccess()) {
                onAccess(node);
            }
        }
    }

    @GuardedBy("evictionLock")
    void onAccess(Node node) {
        if (evicts()) {
            K key = node.getKey();
            if (key == null) {
                return;
            }
            // 更新訪問頻率
            frequencySketch().increment(key);
            // 如果節(jié)點在窗口區(qū),則將其移動到尾節(jié)點
            if (node.inWindow()) {
                reorder(accessOrderWindowDeque(), node);
            }
            // 在試用區(qū)的節(jié)點執(zhí)行 reorderProbation 方法,可能會將該節(jié)點從試用區(qū)晉升到保護區(qū)
            else if (node.inMainProbation()) {
                reorderProbation(node);
            }
            // 否則移動到保護區(qū)的尾結(jié)點
            else {
                reorder(accessOrderProtectedDeque(), node);
            }
            // 更新命中量
            setHitsInSample(hitsInSample() + 1);
        }
        // 配置了訪問過期策略
        else if (expiresAfterAccess()) {
            reorder(accessOrderWindowDeque(), node);
        }
        // 配置了自定義時間過期策略
        if (expiresVariable()) {
            timerWheel().reschedule(node);
        }
    }

    static  void reorder(LinkedDeque> deque, Node node) {
        // 如果節(jié)點存在,將其移動到尾結(jié)點
        if (deque.contains(node)) {
            deque.moveToBack(node);
        }
    }

    @GuardedBy("evictionLock")
    void reorderProbation(Node node) {
        // 檢查試用區(qū)是否包含該節(jié)點,不包含則證明已經(jīng)被移除,則不處理
        if (!accessOrderProbationDeque().contains(node)) {
            return;
        }
        // 檢查節(jié)點的權重是否超過保護區(qū)最大值
        else if (node.getPolicyWeight() > mainProtectedMaximum()) {
            // 如果超過,將該節(jié)點移動到 試用區(qū) 尾巴節(jié)點,保證超重的節(jié)點不會被移動到保護區(qū)
            reorder(accessOrderProbationDeque(), node);
            return;
        }

        // 更新保護區(qū)權重大小
        setMainProtectedWeightedSize(mainProtectedWeightedSize() + node.getPolicyWeight());
        // 在試用區(qū)中移除該節(jié)點
        accessOrderProbationDeque().remove(node);
        // 在保護區(qū)尾節(jié)點中添加
        accessOrderProtectedDeque().offerLast(node);
        // 將該節(jié)點標記為保護區(qū)節(jié)點
        node.makeMainProtected();
    }
}

UpdateTask 修改任務負責變更權重值,并更新節(jié)點所在隊列的順序和訪問頻率,這里我們也能發(fā)現(xiàn),這三個區(qū)域的隊列采用了 LRU 算法,一般情況下,最新被訪問的元素會被移動到尾節(jié)點。到現(xiàn)在,向有固定容量限制的緩存中調(diào)用 put 方法添加元素的邏輯基本已經(jīng)介紹完了,目前對 Caffeine 緩存的了解程度如下所示:

wKgZO2iRqWiAfY6aABD4WGFvKZk608.png

put 添加元素時會先直接添加到 ConcurrentHashMap 中,并在 WriteBuffer 中添加 AddTask/UpdateTask 任務,WriteBuffer 是一個 MPSC 的緩沖區(qū),添加成功后會有加鎖的同步機制在默認的 ForkJoinPool.commonPool() 線程池中提交 PerformCleanupTask 任務,PerformCleanupTask 任務的主要作用是執(zhí)行 maintenance 維護方法,該方法執(zhí)行前需要先獲取同步鎖,單線程消費 WriteBuffer 中的任務。執(zhí)行 AddTask 任務時會將元素先添加到窗口區(qū),如果是 UpdateTask,它會修改三個不同區(qū)域的雙端隊列,這些隊列采用LRU算法,最新被訪問的元素會被放在尾節(jié)點處,并且試用區(qū)的元素被訪問后會被晉升到保護區(qū)尾節(jié)點,元素對應的訪問頻率也會在頻率草圖中更新,如果被添加的節(jié)點權重超過緩存最大權重會直接被驅(qū)逐。(目前維護方法中除了 drainWriteBuffer 方法外,其他步驟還未詳細解釋,之后會在后文中不斷完善)


審核編輯 黃宇

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 算法
    +關注

    關注

    23

    文章

    4738

    瀏覽量

    96695
  • 源碼
    +關注

    關注

    8

    文章

    678

    瀏覽量

    30833
收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關推薦
    熱點推薦

    緩存:從根理解 ConcurrentHashMap

    本文將詳細介紹 ConcurrentHashMap 構(gòu)造方法、添加值方法和擴容操作等源碼實現(xiàn)。 ConcurrentHashMap 是線程安全的哈希表,此哈希表的設計主要目的是在最小化更新操作對哈希
    的頭像 發(fā)表于 08-05 14:48 ?291次閱讀

    本地緩存 Caffeine 中的時間輪(TimeWheel)是什么?

    我們詳細介紹了 Caffeine 緩存添加元素和讀取元素的流程,并詳細解析了配置固定元素數(shù)量驅(qū)逐策略的實現(xiàn)原理。在本文中我們將主要介紹 配置元素過期時間策略的實現(xiàn)原理 ,補全
    的頭像 發(fā)表于 08-05 14:48 ?391次閱讀
    本地<b class='flag-5'>緩存</b> <b class='flag-5'>Caffeine</b> 中的時間輪(TimeWheel)是什么?

    harmony-utilsCacheUtil,緩存工具類

    harmony-utilsCacheUtil,緩存工具類
    的頭像 發(fā)表于 07-04 16:36 ?251次閱讀

    harmony-utilsLRUCacheUtil,LRUCache緩存工具類

    harmony-utilsLRUCacheUtil,LRUCache緩存工具類 harmony-utils 簡介與說明 harmony-utils 一款功能豐富且極易上手的HarmonyOS工具庫
    的頭像 發(fā)表于 07-03 18:11 ?281次閱讀

    高性能緩存設計:如何解決緩存偽共享問題

    緩存行,引發(fā)無效化風暴,使看似無關的變量操作拖慢整體效率。本文從緩存結(jié)構(gòu)原理出發(fā),通過實驗代碼復現(xiàn)偽共享問題(耗時從3709ms優(yōu)化至473ms),解析其底層機制;同時深入剖析高性能緩存C
    的頭像 發(fā)表于 07-01 15:01 ?383次閱讀
    高性能<b class='flag-5'>緩存</b>設計:如何解決<b class='flag-5'>緩存</b>偽共享問題

    請問如何在C++中使用NPU的模型緩存?

    無法確定如何在 C++ 中的 NPU 使用模型緩存
    發(fā)表于 06-24 07:25

    MCU緩存設計

    MCU 設計通過優(yōu)化指令與數(shù)據(jù)的訪問效率,顯著提升系統(tǒng)性能并降低功耗,其核心架構(gòu)與實現(xiàn)策略如下: 一、緩存類型與結(jié)構(gòu) 指令緩存(I-Cache)與數(shù)據(jù)緩存(D-Cache)? I-Ca
    的頭像 發(fā)表于 05-07 15:29 ?662次閱讀

    Nginx緩存配置詳解

    Nginx 是一個功能強大的 Web 服務器和反向代理服務器,它可以用于實現(xiàn)靜態(tài)內(nèi)容的緩存,緩存可以分為客戶端緩存和服務端緩存
    的頭像 發(fā)表于 05-07 14:03 ?830次閱讀
    Nginx<b class='flag-5'>緩存</b>配置<b class='flag-5'>詳解</b>

    nginx中強緩存和協(xié)商緩存介紹

    緩存直接告訴瀏覽器:在緩存過期前,無需與服務器通信,直接使用本地緩存。
    的頭像 發(fā)表于 04-01 16:01 ?596次閱讀

    詳解天神眼C三目方案,跟大疆“撞車”了?

    電子發(fā)燒友網(wǎng)報道(/梁浩斌)最近比亞迪推出的“天神眼”高階智駕系統(tǒng)引爆了行業(yè),將高階智駕從過去的20左右價格,大幅下放至10級別,甚至7.88
    的頭像 發(fā)表于 02-14 01:28 ?4235次閱讀

    ADS4129后級接緩存器,緩存器出現(xiàn)過熱的原因?

    ,焊接沒有問題,同時也注意了緩存器方向問題,AD轉(zhuǎn)換數(shù)據(jù)輸出也有;電不工作時也發(fā)燙,想請教各位其中的原因可能是什么呢?謝謝給位了?。?!
    發(fā)表于 02-07 08:42

    HTTP緩存頭的使用 本地緩存與遠程緩存的區(qū)別

    HTTP緩存頭是一組HTTP響應頭,它們控制瀏覽器和中間代理服務器如何緩存網(wǎng)頁內(nèi)容。合理使用HTTP緩存頭可以顯著提高網(wǎng)站的加載速度和性能,減少服務器的負載。 1. HTTP緩存頭概述
    的頭像 發(fā)表于 12-18 09:41 ?704次閱讀

    緩存——如何選擇合適的本地緩存?

    Guava cache是Google開發(fā)的Guava工具包中一套完善的JVM本地緩存框架,底層實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)類似于ConcurrentHashMap,但是進行了更多的能力拓展,包括緩存過期時間設置、
    的頭像 發(fā)表于 11-17 14:24 ?995次閱讀
    <b class='flag-5'>緩存</b><b class='flag-5'>之</b><b class='flag-5'>美</b>——如何選擇合適的本地<b class='flag-5'>緩存</b>?

    智慧公交是什么?一帶你詳解智慧公交的解決方案!

    智慧公交是什么?一帶你詳解智慧公交的解決方案!
    的頭像 發(fā)表于 11-05 12:26 ?1264次閱讀
    智慧公交是什么?一<b class='flag-5'>文</b>帶你<b class='flag-5'>詳解</b>智慧公交的解決方案!

    tlv320aic3254+purepath studio如何在PPS里面實現(xiàn)32位數(shù)據(jù)的緩存

    問題: 1. 如何在PPS里面實現(xiàn)32位數(shù)據(jù)的緩存? 2.如何搭建最小均方算法(LMS)?
    發(fā)表于 11-01 07:39