Java高并发-容器

Java高并发-容器

前言:在java开发中我们肯定会大量的使用集合,在这里我将总结常见的集合类,每个集合类的优点和缺点,以便我们能更好的使用集合。下面我用一幅图来表示
aa
其中淡绿色的表示接口,红色的表示我们经常使用的类

单列模式

内部类-线程安全的单列模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Singleton{
private Singleton(){
System.out.println("Singleton");
}

private static class Inner {
private static Singleton s = new Singleton();
}

private static Singleton getSingle(){
return Inner.s;
}

public static void main(String[] args) {
Thread[] ths = new Thread[200];
for(int i=0; i<this.length; i++){
ths[i] = new Thread(()->{
Singleton.getSingle();
});
}
Arrays.asList(ths).forEach(o->o.start());
}
}

基本概念

Java容器类类库的用途是保存对象,java cocurrent包提供了很多并发容器,在提供并发控制的前提下,通过优化,提升性能。

  1. Collection

    一个独立元素的序列,这些元素都服从一条或多条规则。其中List必须按照插入的顺序保存元素、Set不能有重复的元素、Queue按照排队规则来确定对象的产生顺序(通常也是和插入顺序相同)

  2. Map

    一组成对的值键对对象,允许用键来查找值。ArrayList允许我们用数字来查找值,它是将数字和对象联系在一起。而Map允许我们使用一个对象来查找某个对象,它也被称为关联数组。或者叫做字典。

  3. List

    List承诺可以将元素维护在特定的序列中。

    • 基本的ArrayList:

      它的优点在于随机访问元素快,但是在中间插入和移除比较慢,ArrayList其实就是采用的是数组(默认是长度为10的数组)。所有ArrayList在读取的时候是具有和数组一样的效率,它的时间复杂度为1。
      插入尾部就是elementData[size++] = e;当然中间会进行扩容

    • LinkedList

      它是通过代价较低在List中间进行插入和移除,提供了优化的顺序访问,但是在随机访问方面相对较慢,ListedList采用的是链式存储。链式存储就会定一个节点Node。包括三部分前驱节点、后继节点以及data值

    • Set

      Set也是一个集合,但是他的特点是不可以有重复的对象,所以Set最常用的就是测试归属性,很容易的询问出某个对象是否存在Set中

      • HashSet

        HashSet查询速度比较快,但是存储的元素是随机的并没有排序

      • TreeSet

        TreeSet是将元素存储红-黑树结构中,所以存储的结果是有顺序的

  4. Queue

    Queue是队列,队列是先进先出的容器,就是从容器的一端放入元素,从另一端取出,并且元素放入容器的顺序和取出的顺序是相同的。LinkedList提供了对Queue的实现,LinkedList向上转型为Queue。其中Queue有offer、peek、element、pool、remove等方法

    offer是将元素插入队尾,返回false表示添加失败。peek和element都将在不移除的情况下返回对头,但是peek在对头为null的时候返回null,而element会抛出NoSuchElementException异常。poll和remove方法将移除并返回对头,但是poll在队列为null,而remove会抛出NoSuchElementException异常

  5. Map

    Map在实际开发中使用非常广,特别是HashMap,想象一下我们要保存一个对象中某些元素的值,如果我们在创建一个对象显得有点麻烦,这个时候我们就可以用上map了,HashMap采用是散列函数所以查询的效率是比较高的,如果我们需要一个有序的我们就可以考虑使用TreeMap。

  6. Iterator和Foreach

    现在foreach语法主要作用于数组,但是他也可以应用于所有的Collection对象。Collection之所以能够使用foreach是由于继承了Iterator这个接口

    foreach循环最终是转换成 for (Iterator it=iterator;iterators.hasNext();)只不过jdk帮我们隐藏我们无法查看

    如果对集合有删除操作,请记住一定要加上iterator.next()

  7. Collections和Arrays

    Collections.addAll和Arrays.asList

  8. 总结

    • 数组是将数字和对象联系起来,它保存明确的对象,查询对象时候不需要对查询结果进行转换,它可以是多维的,可以保存基本类型的数据,但是数组一旦生成,其容量不能改变。所以数组是不可以直接删除和添加元素。

    • Collection保存单一的元素,而Map保存相关联的值键对,有了Java泛型,可以指定容器存放对象类型,不会将错误类型的对象放在容器中,取元素时候也不需要转型。而且Collection和Map都可以自动调整其尺寸。容器不可以持有基本类型。

    1. 像数组一样,List也建立数字索性和对象的关联,因此,数组和List都是排好序的容器,List可以自动扩容

    2. 如果需要大量的随机访问就要使用ArrayList,如果要经常从中间插入和删除就要使用LinkedList。

    3. 各种Queue和Stack由LinkedList支持

    4. Map是一种将对象(而非数字)与对象相关联的设计。HashMap用于快速访问,TreeMap保持键始终处于排序状态,所以不如HashMap快,而LinkedHashMap保持元素插入的顺序,但是也通过散列提供了快速访问的能力

    5. Set不接受重复的元素,HashSet提供最快的访问能力,TreeSet保持元素排序状态,LinkedHashSet以插入顺序保存元素。

为什么JUC需要提供并发容器?

多数容器类都是非线程安全的,即使部分容器是线程安全的,由于使用sychronized进行锁控制,导致读/写均需进行锁操作,性能很低。

java collection framework可以通过以下两种方式实现容器对象读写的并发控制,但是都是基于sychronized锁控制机制,性能低:

  1. 使用sychronized方法进行并发控制,如HashTable 和 Vector。以下代码为Vector.add(e)的java8实现代码:

    1
    2
    3
    4
    5
    6
    public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
    }
  2. 使用工具类Collections将非线程安全容器包装成线程安全容器。以下代码是Collections.synchronizedMap(Map<K,V> m)将原始Map包装为线程安全的SynchronizedMap,但是实际上最终操作时,仍然是在被包装的原始map上进行,只是SynchronizedMap的所有方法都加上了synchronized锁控制。

    1
    2
    3
    public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
    return new SynchronizedMap<>(m); //将原始Map包装为线程安全的SynchronizedMap
    }

JUC并发容器

CAS

CAS是一种无锁的非阻塞算法,全称为:Compare-and-swap(比较并交换),大致思路是:先比较目标对象现值是否和旧值一致,如果一致,则更新对象为新值;如果不一致,则表明对象已经被其他线程修改,直接返回

1
2
3
4
5
6
7
function cas(p : pointer to int, old : int, new : int) returns bool {
if *p ≠ old {
return false
}
*p ← new
return true
}
  • ConcurrentHashMap

    ConcurrentHashMap实现了HashTable的所有功能,线程安全,但却在检索元素时不需要锁定,因此效率更高。

    ConcurrentHashMap的key 和 value都不允许null出现。原因在于ConcurrentHashMap不能区分出value是null还是没有map上,相对的HashMap却可以允许null值,在于其使用在单线程环境下,可以使用containKey(key)方法提前判定是否能map上,从而区分这两种情况,但是ConcurrentHashMap在多线程使用上下文中则不能这么判定

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public class ConcurrentHashMap_Test{
    public static void main(String[] args) {
    Map<String, String> map = new ConcurrentHashMap<>();
    // Map<String, String> map = new ConcurrentSkipListMap<>();

    // Map<String, String> map = new HashMap<>(); // Collections.synchronizedXXXX

    Random r = new Random();
    Thread[] ths = new Thread[100];
    CountDownLatch latch = new CountDownLatch(ths.length);//阀门,阀门计数器
    long start = System.currentTimeMillis();
    for(int i=0;i<ths.length;i++) {
    ths[i] = new Thread(()->{
    for(int j=0;j<10000;j++) map.put("A" + r.nextInt(1000), "A" + r.nextInt(1000));
    latch.countDown();
    });
    }

    Arrays.asList(ths).forEach(t->t.start());
    try{
    latch.await();
    } catch(Exception e) {

    }

    long end = System.currentTimeMillis();
    System.out.println(end - start);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    public V put(K key, V value) {
    return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0)
    tab = initTable();
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
    new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin,当前hash对应的bin(桶)还不存在时,使用cas写入; 写入失败,则再次尝试。
    }
    else if ((fh = f.hash) == MOVED) //如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成后的table
    tab = helpTransfer(tab, f);
    else {
    V oldVal = null;
    synchronized (f) { // 加锁保证线程安全,但不是对整个table加锁,只对当前的Node加锁,避免其他线程对当前Node进行写操作。
    if (tabAt(tab, i) == f) {
    if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
    K ek;
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) { //如果在链表中找到值为key的节点e,直接设置e.val = value即可
    oldVal = e.val;
    if (!onlyIfAbsent)
    e.val = value;
    break;
    }
    Node<K,V> pred = e; //如果没有找到值为key的节点,直接新建Node并加入链表即可
    if ((e = e.next) == null) {
    pred.next = new Node<K,V>(hash, key,
    value, null);
    break;
    }
    }
    }
    else if (f instanceof TreeBin) { //如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作
    Node<K,V> p;
    binCount = 2;
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    value)) != null) {
    oldVal = p.val;
    if (!onlyIfAbsent)
    p.val = value;
    }
    }
    }
    }
    if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD) //如果节点数大于阈值,则转换链表结构为红黑树结构
    treeifyBin(tab, i);
    if (oldVal != null)
    return oldVal;
    break;
    }
    }
    }
    addCount(1L, binCount); //计数增加1,有可能触发transfer操作(扩容)
         return null;
        }

    在jdk1.8中主要做了2方面的改进

    改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

    改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

    另外,在其他方面也有一些小的改进,比如新增字段 transient volatile CounterCell[] counterCells; 可方便的计算hashmap中所有元素的个数,性能大大优于jdk1.7中的size()方法。

  • ConcurrentLinkedQueue

    ConcurrentLinkedQueue使用链表作为数据结构,它采用无锁操作,可以任务是高并发环境下性能最好的队列。

    ConcurrentLinkedQueue是非阻塞线程安全队列,无界,故不太适合做生产者消费者模式,而LinkedBlockingQueue是阻塞线程安全队列,可以做到有界,通常用于生产者消费者模式。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class TicketSeller{
    static Queue<String> tickets = new ConcurrentLinkedQueue<>();

    static{
    for(int i=0;i<1000; i++) tickets.add("ticket :" + i);
    }

    public static void main(String[] args) {
    for(int i=0;i<10;i++) {
    new Thread(()->{
    while(true){
    String s = ticket.poll();//同步,从头开始获取
    if(s==null) break;
    else System.out.println("sale:" + s)
    }
    }).start();
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    /**
    * 不断尝试:找到最新的tail节点,不断尝试想最新的tail节点后面添加新节点
    */
    public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) { //不断尝试:找到最新的tail节点,不断尝试想最新的tail节点后面添加新节点。
    Node<E> q = p.next;
    if (q == null) {
    // p is last node
    if (p.casNext(null, newNode)) {
    // Successful CAS is the linearization point
    // for e to become an element of this queue,
    // and for newNode to become "live".
    if (p != t) // hop two nodes at a time //t引用有可能并不是真实的tail节点的引用,多线程操作时,允许该情况出现,只要能保证每次新增元素是在真实的tail节点上添加的即可。
    casTail(t, newNode); // Failure is OK. 即使失败,也不影响下次offer新的元素,反正后面会试图寻找到最新的真实tail元素
    return true;
    }
    // Lost CAS race to another thread; re-read next CAS竞争失败,再次尝试
    }
    else if (p == q) //遇到哨兵节点(next和item相同,空节点或者删除节点),从head节点重新遍历。确保找到最新的tail节点
    // We have fallen off list. If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable. Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;
    else
    // Check for tail updates after two hops.
    p = (p != t && t != (t = tail)) ? t : q; //java中'!='运算符不是原子操作,故使用t != (t = tail)做一次判定,如果tail被其他线程更改,则直接使用最新的tail节点返回。
    }
    }
  • CopyOnWriteArrayList

    CopyOnWriteArray 设计了CopyOnWriteArrayList和CopyOnWriteArraySet

    CopyOnWriteArrayList提供高效地读取操作,使用在读多写少的场景。CopyOnWriteArrayList读取操作不用加锁,且是安全的;写操作时,先copy一份原有数据数组,再对复制数据进行写入操作,最后将复制数据替换原有数据,从而保证写操作不影响读操作。

    1
    2
    3
    public class CopyOnWriteListTest{

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();
    /**
    * Sets the array.
    */
    final void setArray(Object[] a) {
    array = a;
    }

    /**
    * Gets the array. Non-private so as to also be accessible
    * from CopyOnWriteArraySet class.
    */
    final Object[] getArray() {
    return array;
    }

    public E get(int index) {
    return get(getArray(), index);
    }

    /**
    * Appends the specified element to the end of this list.
    *
    * @param e element to be appended to this list
    * @return {@code true} (as specified by {@link Collection#add})
    */
    public boolean add(E e) {
    //独占锁
    final ReentrantLock lock = this.lock;
    lock.lock(); //写 互斥 读
    try {
    Object[] elements = getArray();
    int len = elements.length;
    //复制一个新的数组newElements
    Object[] newElements = Arrays.copyOf(elements, len + 1);
    newElements[len] = e; //对副本进行修改操作
    setArray(newElements); //将修改后的副本替换原有的数据
    return true;
    } finally {
    lock.unlock();
    }
    }

    }

    在add操作中通过一个共享的ReentrantLock来获取锁,这样可以防止多线程下多个线程同时修改容器内容。获取锁后通过Arrays.copyOf复制了一个新的容器,然后对新的容器进行了修改,最后直接通过setArray将原数组引用指向了新的数组,避免了在修改过程中迭代数据出现错误。get操作由于是读操作,未加锁,直接读取就行。

    缺点:

    • 性能低下,会不断申请新的空间,同时会造成频繁的GC
    • 数据一致性问题,修改中copy了新的数组进行替换,同时旧数组如果还在被使用,那么新的数据就不能被及时读取到,这样就造成了数据不一致
  • ConcurrentSkipListMap

    ConcurrentSkipListMap内部使用跳表(SkipList)这种数据结构来实现,他的结构相对红黑树来说非常简单理解,实现起来也相对简单,而且在理论上它的查找、插入、删除时间复杂度都为log(n)。在并发上,ConcurrentSkipListMap采用无锁的CAS+自旋来控制。

    SkipList(跳表)是一种随机性的数据结构,用于替代红黑树,因为它在高并发的情况下,性能优于红黑树。跳表实际上是以空间换取时间。

    ConcurrentSkipListMap的实现就是实现了一个无锁版的跳表,主要是利用无锁的链表的实现来管理跳表底层,同样利用CAS来完成替换。

    通过分析ConcurrentSkipListMap的put方法来理解跳表以及CAS自旋并发控制:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z; // added node
    if (key == null)
    throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
    for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前继节点
    if (n != null) { //查找到前继节点
    Object v; int c;
    Node<K,V> f = n.next; //获取后继节点的后继节点
    if (n != b.next) //发生竞争,两次节点获取不一致,并发导致
    break;
    if ((v = n.value) == null) { // 节点已经被删除
    n.helpDelete(b, f);
    break;
    }
    if (b.value == null || v == n)
    break;
    if ((c = cpr(cmp, key, n.key)) > 0) { //进行下一轮查找,比当前key大
    b = n;
    n = f;
    continue;
    }
    if (c == 0) { //相等时直接cas修改值
    if (onlyIfAbsent || n.casValue(v, value)) {
    @SuppressWarnings("unchecked") V vv = (V)v;
    return vv;
    }
    break; // restart if lost race to replace value
    }
    // else c < 0; fall through
    }

    z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
    if (!b.casNext(n, z)) //cas修改值
    break; // restart if lost race to append to b
    break outer;
    }
    }

    int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取随机数
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
    int level = 1, max;
    while (((rnd >>>= 1) & 1) != 0) // 获取跳表层级
    ++level;
    Index<K,V> idx = null;
    HeadIndex<K,V> h = head;
    if (level <= (max = h.level)) { //如果获取的调表层级小于等于当前最大层级,则直接添加,并将它们组成一个上下的链表
    for (int i = 1; i <= level; ++i)
    idx = new Index<K,V>(z, idx, null);
    }
    else { // try to grow by one level //否则增加一层level,在这里体现为Index<K,V>数组
    level = max + 1; // hold in array and later pick the one to use
    @SuppressWarnings("unchecked")Index<K,V>[] idxs =
    (Index<K,V>[])new Index<?,?>[level+1];
    for (int i = 1; i <= level; ++i)
    idxs[i] = idx = new Index<K,V>(z, idx, null);
    for (;;) {
    h = head;
    int oldLevel = h.level;
    if (level <= oldLevel) // lost race to add level
    break;
    HeadIndex<K,V> newh = h;
    Node<K,V> oldbase = h.node;
    for (int j = oldLevel+1; j <= level; ++j) //新添加的level层的具体数据
    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
    if (casHead(h, newh)) {
    h = newh;
    idx = idxs[level = oldLevel];
    break;
    }
    }
    }
    // 逐层插入数据过程
    splice: for (int insertionLevel = level;;) {
    int j = h.level;
    for (Index<K,V> q = h, r = q.right, t = idx;;) {
    if (q == null || t == null)
    break splice;
    if (r != null) {
    Node<K,V> n = r.node;
    // compare before deletion check avoids needing recheck
    int c = cpr(cmp, key, n.key);
    if (n.value == null) {
    if (!q.unlink(r))
    break;
    r = q.right;
    continue;
    }
    if (c > 0) {
    q = r;
    r = r.right;
    continue;
    }
    }

    if (j == insertionLevel) {
    if (!q.link(r, t))
    break; // restart
    if (t.node.value == null) {
    findNode(key);
    break splice;
    }
    if (--insertionLevel == 0)
    break splice;
    }

    if (--j >= insertionLevel && j < level)
    t = t.down;
    q = q.down;
    r = q.right;
    }
    }
    }
    return null;
    }

    这里的插入方法很复杂,可以分为3大步来理解:第一步获取前继节点后通过CAS来插入节点;第二步对level层数进行判断,如果大于最大层数,则插入一层;第三步插入对应层的数据。整个插入过程全部通过CAS自旋的方式保证并发情况下的数据正确性。

  • ConcurrentLinkedQueue, ArrayBlockingQueue与LinkedBlockingQueue

    三者区别与联系:

    联系,三者都是线程安全的。

    区别,就是并发和阻塞,前者为并发队列,因为采用 cas 算法,所以能够高并发的处理;后2者采用锁机制,所以是阻塞的。注意点就是前者由于采用cas算法,虽然能高并发,但cas的特点造成操作的危险性,怎么危险性可以去查一下cas算法(但一些多消费性的队列还是用的它,原因看下边使用场景中的说明)

    后2者区别:

    联系,第2和第3都是阻塞队列,都是采用锁,都有阻塞容器Condition,通过Condition阻塞容量为空时的取操作和容量满时的写操作第。

    区别,第2就一个整锁,第3是2个锁,所以第2、第3的锁机制不一样,第3比第2吞吐量大,并发性能也比第2高。

    后2者的具体信息:LinkedBlockingQueue是BlockingQueue的一种使用Link List的实现,它对头和尾(取和添加操作)采用两把不同的锁,相对于 ArrayBlockingQueue 提高了吞吐量。它也是一种阻塞型的容器,适合于实现 “消费者生产者” 模式。
    ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

    正因为LinkedBlockingQueue使用两个独立的锁控制数据同步,所以可以使存取两种操作并行执行,从而提高并发效率。而 ArrayBlockingQueue使用一把锁,造成在存取两种操作争抢一把锁,而使得性能相对低下。LinkedBlockingQueue可以不设置队列容量,默认为Integer.MAX_VALUE.其容易造成内存溢出,一般要设置其值。

    使用场景总结:

    适用阻塞队列的好处:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距,自动平衡负载这个特性就造成它能被用于多生产者队列,因为你生成多了(队列满了)你就要阻塞等着,直到消费者消费使队列不满你才可以继续生产。 当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。

    • LinkedBlockingQueue 多用于任务队列(单线程发布任务,任务满了就停止等待阻塞,当任务被完成消费少了又开始负载 发布任务)

    • ConcurrentLinkedQueue 多用于消息队列(多个线程发送消息,先随便发来,不计并发的-cas特点)

    多个生产者,对于LBQ性能还算可以接受;但是多个消费者就不行了mainLoop需要一个timeout的机制,否则空转,cpu会飙升的。LBQ正好提供了timeout的接口,更方便使用如果CLQ,那么我需要收到处理sleep

    单生产者,单消费者用 LinkedBlockingqueue
    多生产者,单消费者用 LinkedBlockingqueue
    单生产者,多消费者用 ConcurrentLinkedQueue
    多生产者,多消费者用 ConcurrentLinkedQueue

    对上边总结:

    如消息队列,好多client发来消息,根据client发送先后放入队列中,先发送的就先放进来,然后由于队列是先进先出,是一个一个出来的,所以不涉及到线程安全问题,所以用LinkedBlockingqueue队列。比如还拿上边消息队列那个例子,由于队列是一个一个出来的,出来一个消息协议体就由线程池分配一个线程去处理这个消息体,这个消息体对于线程池来说谈不上共享不共享的问题,即不会多个线程去抢同一个消息体去执行,所以就不需要用线程安全的队列结构了;那假如一种情况,队列里仍然是一个一个的出来,但是出来的这个元素是 线程池共享的,即大家线程都需要用到这个从队列里出来的这个元素,也就是多消费者消费同一个东西这种情况,所以就要用线程安全的队列了,即ConcurrentLinkedQueue。

-------------本文结束感谢您的阅读-------------
Dean Wang wechat