且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Juc16_LongAdder引入、原理、Striped64、分散热点思想、深度解析LongAdder源码、LongAdder和AtomicLong区别(三)

更新时间:2021-12-08 00:20:54

②. longAccumulate(x, null, uncontended)


  • ①. 线程hash值:probe


final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    //存储线程的probe值
    int h;
    //如果getProbe()方法返回0,说明随机数未初始化
    if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
        //使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
        ThreadLocalRandom.current(); // force initialization
        //重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
        h = getProbe();
        //重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈
        //wasUncontended竞争状态为true
        wasUncontended = true;
    }


Juc16_LongAdder引入、原理、Striped64、分散热点思想、深度解析LongAdder源码、LongAdder和AtomicLong区别(三)


②. 刚刚初始化Cell[ ]数组(首次新建)


    //CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
    /*
    cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
    cells == as == null  是成立的
    casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,
    返回true,第一次进来没人抢占cell单元格,肯定返回true
    **/
    else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 
        //是否初始化的标记
        boolean init = false;
        try {                           // Initialize table(新建cells)
            // 前面else if中进行了判断,这里再次判断,采用双端检索的机制
            if (cells == as) {
                //如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
                Cell[] rs = new Cell[2];
                //rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
                //h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,
                //通常都是hash&(table.len-1),同hashmap一个意思
                //看这次的value是落在0还是1
                rs[h & 1] = new Cell(x);
                cells = rs;
                init = true;
            }
        } finally {
            cellsBusy = 0;
        }
        if (init)
            break;
    }


③. 兜底(多个线程尝试CAS修改失败的线程会走这个分支)


    //CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
    //这种情况是cell中都CAS失败了,有一个兜底的方法
    //该分支实现直接操作base基数,将值累加到base上,
    //也即其他线程正在初始化,多个线程正在更新base的值
    else if (casBase(v = base, ((fn == null) ? v + x :
                                fn.applyAsLong(v, x))))
        break;     


④. Cell数组不再为空且可能存在Cell数组扩容


for (;;) {
    Cell[] as; Cell a; int n; long v;
    if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
        // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
        if ((a = as[(n - 1) & h]) == null) {
            //Cell[]数组没有正在扩容
            if (cellsBusy == 0) {       // Try to attach new Cell
                //先创建一个Cell
                Cell r = new Cell(x);   // Optimistically create
                //尝试加锁,加锁后cellsBusy=1
                if (cellsBusy == 0 && casCellsBusy()) { 
                    boolean created = false;
                    try {               // Recheck under lock
                        Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
                        //在有锁的情况下再检测一遍之前的判断 
                        if ((rs = cells) != null &&
                            (m = rs.length) > 0 &&
                            rs[j = (m - 1) & h] == null) {
                            rs[j] = r;
                            created = true;
                        }
                    } finally {
                        cellsBusy = 0;//释放锁
                    }
                    if (created)
                        break;
                    continue;           // Slot is now non-empty
                }
            }
            collide = false;
        }
        /**
        wasUncontended表示cells初始化后,当前线程竞争修改失败
        wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
        紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
        */
        else if (!wasUncontended)       // CAS already known to fail
            wasUncontended = true;      // Continue after rehash
        //说明当前线程对应的数组中有了数据,也重置过hash值
        //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
        else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                     fn.applyAsLong(v, x))))
            break;
        //如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
        else if (n >= NCPU || cells != as)
            collide = false;    //扩容标识设置为false,标识永远不会再扩容
        //如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
        else if (!collide) 
            collide = true;
        //锁状态为0并且将锁状态修改为1(持有锁) 
        else if (cellsBusy == 0 && casCellsBusy()) { 
            try {
                if (cells == as) {      // Expand table unless stale
                    //按位左移1位来操作,扩容大小为之前容量的两倍
                    Cell[] rs = new Cell[n << 1];
                    for (int i = 0; i < n; ++i)
                        //扩容后将之前数组的元素拷贝到新数组中
                        rs[i] = as[i];
                    cells = rs; 
                }
            } finally {
                //释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
                cellsBusy = 0;
            }
            collide = false;
            continue;                   // Retry with expanded table
        }
        h = advanceProbe(h);
    }


Juc16_LongAdder引入、原理、Striped64、分散热点思想、深度解析LongAdder源码、LongAdder和AtomicLong区别(三)


Juc16_LongAdder引入、原理、Striped64、分散热点思想、深度解析LongAdder源码、LongAdder和AtomicLong区别(三)


③. Striped64.java


Striped64.java
/**
    1.LongAdder继承了Striped64类,来实现累加功能,它是实现高并发累加的工具类
    2.Striped64的设计核心思路就是通过内部的分散计算来避免竞争
    3.Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表
    4.没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,
    会将要累加的数累加到Cells数组中的某个cell元素里面
*/
abstract class Striped64 extends Number {
    //CPU数量,即Cells数组的最大长度
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    //存放Cell的hash表,大小为2的幂
    transient volatile Cell[] cells;
    /*
    1.在开始没有竞争的情况下,将累加值累加到base;
    2.在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base
    */
    transient volatile long base;
    /*
    cellsBusy,它有两个值0或1,它的作用是当要修改cells数组时加锁,
    防止多线程同时修改cells数组(也称cells表),0为无锁,1位加锁,加锁的状况有三种:
    (1). cells数组初始化的时候;
    (2). cells数组扩容的时候;
    (3).如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;

    */
    transient volatile int cellsBusy;
    
    //低并发状态,还没有新建cell数组且写入进入base,刚好够用
    //base罩得住,不用上cell数组
    final boolean casBase(long cmp, long val) {
        //当前对象,在base位置上,将base(类似于AtomicLong中全局的value值),将base=0(cmp)改为1(value)
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
    
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //存储线程的probe值
        int h;
        //如果getProbe()方法返回0,说明随机数未初始化
        if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
            //使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
            ThreadLocalRandom.current(); // force initialization
            //重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
            h = getProbe();
            //重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈,wasUncontended竞争状态为true
            wasUncontended = true;
        }
        //如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
                // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
                if ((a = as[(n - 1) & h]) == null) {
                    //Cell[]数组没有正在扩容
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        //先创建一个Cell
                        Cell r = new Cell(x);   // Optimistically create
                        //尝试加锁,加锁后cellsBusy=1
                        if (cellsBusy == 0 && casCellsBusy()) { 
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
                                //在有锁的情况下再检测一遍之前的判断 
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;//释放锁
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                /**
                wasUncontended表示cells初始化后,当前线程竞争修改失败
                wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
                紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
                */
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //说明当前线程对应的数组中有了数据,也重置过hash值
                //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
                else if (n >= NCPU || cells != as)
                    collide = false;    //扩容标识设置为false,标识永远不会再扩容
                //如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
                else if (!collide) 
                    collide = true;
                //锁状态为0并且将锁状态修改为1(持有锁) 
                else if (cellsBusy == 0 && casCellsBusy()) { 
                    try {
                        if (cells == as) {      // Expand table unless stale
                            //按位左移1位来操作,扩容大小为之前容量的两倍
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                //扩容后将之前数组的元素拷贝到新数组中
                                rs[i] = as[i];
                            cells = rs; 
                        }
                    } finally {
                        //释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
            /*
            cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
            cells == as == null  是成立的
            casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true,第一次进来没人抢占cell单元格,肯定返回true
            **/
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 
                //是否初始化的标记
                boolean init = false;
                try {                           // Initialize table(新建cells)
                    // 前面else if中进行了判断,这里再次判断,采用双端检索的机制
                    if (cells == as) {
                        //如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
                        Cell[] rs = new Cell[2];
                        //rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
                        //h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,通常都是hash&(table.len-1),同hashmap一个意思
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
            //这种情况是cell中都CAS失败了,有一个兜底的方法
            //该分支实现直接操作base基数,将值累加到base上,也即其他线程正在初始化,多个线程正在更新base的值
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
    
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
}