Java高并发-容器
前言:在java开发中我们肯定会大量的使用集合,在这里我将总结常见的集合类,每个集合类的优点和缺点,以便我们能更好的使用集合。下面我用一幅图来表示
其中淡绿色的表示接口,红色的表示我们经常使用的类
单列模式
内部类-线程安全的单列模式
1 | public class Singleton{ |
基本概念
Java容器类类库的用途是保存对象,java cocurrent包提供了很多并发容器,在提供并发控制的前提下,通过优化,提升性能。
Collection
一个独立元素的序列,这些元素都服从一条或多条规则。其中List必须按照插入的顺序保存元素、Set不能有重复的元素、Queue按照排队规则来确定对象的产生顺序(通常也是和插入顺序相同)
Map
一组成对的值键对对象,允许用键来查找值。ArrayList允许我们用数字来查找值,它是将数字和对象联系在一起。而Map允许我们使用一个对象来查找某个对象,它也被称为关联数组。或者叫做字典。
List
List承诺可以将元素维护在特定的序列中。
基本的ArrayList:
它的优点在于随机访问元素快,但是在中间插入和移除比较慢,ArrayList其实就是采用的是数组(默认是长度为10的数组)。所有ArrayList在读取的时候是具有和数组一样的效率,它的时间复杂度为1。
插入尾部就是elementData[size++] = e;当然中间会进行扩容LinkedList
它是通过代价较低在List中间进行插入和移除,提供了优化的顺序访问,但是在随机访问方面相对较慢,ListedList采用的是链式存储。链式存储就会定一个节点Node。包括三部分前驱节点、后继节点以及data值
Set
Set也是一个集合,但是他的特点是不可以有重复的对象,所以Set最常用的就是测试归属性,很容易的询问出某个对象是否存在Set中
HashSet
HashSet查询速度比较快,但是存储的元素是随机的并没有排序
TreeSet
TreeSet是将元素存储红-黑树结构中,所以存储的结果是有顺序的
Queue
Queue是队列,队列是先进先出的容器,就是从容器的一端放入元素,从另一端取出,并且元素放入容器的顺序和取出的顺序是相同的。LinkedList提供了对Queue的实现,LinkedList向上转型为Queue。其中Queue有offer、peek、element、pool、remove等方法
offer是将元素插入队尾,返回false表示添加失败。peek和element都将在不移除的情况下返回对头,但是peek在对头为null的时候返回null,而element会抛出NoSuchElementException异常。poll和remove方法将移除并返回对头,但是poll在队列为null,而remove会抛出NoSuchElementException异常
Map
Map在实际开发中使用非常广,特别是HashMap,想象一下我们要保存一个对象中某些元素的值,如果我们在创建一个对象显得有点麻烦,这个时候我们就可以用上map了,HashMap采用是散列函数所以查询的效率是比较高的,如果我们需要一个有序的我们就可以考虑使用TreeMap。
Iterator和Foreach
现在foreach语法主要作用于数组,但是他也可以应用于所有的Collection对象。Collection之所以能够使用foreach是由于继承了Iterator这个接口
foreach循环最终是转换成 for (Iterator it=iterator;iterators.hasNext();)只不过jdk帮我们隐藏我们无法查看
如果对集合有删除操作,请记住一定要加上iterator.next()
Collections和Arrays
Collections.addAll和Arrays.asList
总结
数组是将数字和对象联系起来,它保存明确的对象,查询对象时候不需要对查询结果进行转换,它可以是多维的,可以保存基本类型的数据,但是数组一旦生成,其容量不能改变。所以数组是不可以直接删除和添加元素。
Collection保存单一的元素,而Map保存相关联的值键对,有了Java泛型,可以指定容器存放对象类型,不会将错误类型的对象放在容器中,取元素时候也不需要转型。而且Collection和Map都可以自动调整其尺寸。容器不可以持有基本类型。
像数组一样,List也建立数字索性和对象的关联,因此,数组和List都是排好序的容器,List可以自动扩容
如果需要大量的随机访问就要使用ArrayList,如果要经常从中间插入和删除就要使用LinkedList。
各种Queue和Stack由LinkedList支持
Map是一种将对象(而非数字)与对象相关联的设计。HashMap用于快速访问,TreeMap保持键始终处于排序状态,所以不如HashMap快,而LinkedHashMap保持元素插入的顺序,但是也通过散列提供了快速访问的能力
Set不接受重复的元素,HashSet提供最快的访问能力,TreeSet保持元素排序状态,LinkedHashSet以插入顺序保存元素。
为什么JUC需要提供并发容器?
多数容器类都是非线程安全的,即使部分容器是线程安全的,由于使用sychronized进行锁控制,导致读/写均需进行锁操作,性能很低。
java collection framework可以通过以下两种方式实现容器对象读写的并发控制,但是都是基于sychronized锁控制机制,性能低:
使用sychronized方法进行并发控制,如HashTable 和 Vector。以下代码为Vector.add(e)的java8实现代码:
1
2
3
4
5
6public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}使用工具类Collections将非线程安全容器包装成线程安全容器。以下代码是Collections.synchronizedMap(Map<K,V> m)将原始Map包装为线程安全的SynchronizedMap,但是实际上最终操作时,仍然是在被包装的原始map上进行,只是SynchronizedMap的所有方法都加上了synchronized锁控制。
1
2
3public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m); //将原始Map包装为线程安全的SynchronizedMap
}
JUC并发容器
CAS
CAS是一种无锁的非阻塞算法,全称为:Compare-and-swap(比较并交换),大致思路是:先比较目标对象现值是否和旧值一致,如果一致,则更新对象为新值;如果不一致,则表明对象已经被其他线程修改,直接返回
1 | function cas(p : pointer to int, old : int, new : int) returns bool { |
ConcurrentHashMap
ConcurrentHashMap实现了HashTable的所有功能,线程安全,但却在检索元素时不需要锁定,因此效率更高。
ConcurrentHashMap的key 和 value都不允许null出现。原因在于ConcurrentHashMap不能区分出value是null还是没有map上,相对的HashMap却可以允许null值,在于其使用在单线程环境下,可以使用containKey(key)方法提前判定是否能map上,从而区分这两种情况,但是ConcurrentHashMap在多线程使用上下文中则不能这么判定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class ConcurrentHashMap_Test{
public static void main(String[] args) {
Map<String, String> map = new ConcurrentHashMap<>();
// Map<String, String> map = new ConcurrentSkipListMap<>();
// Map<String, String> map = new HashMap<>(); // Collections.synchronizedXXXX
Random r = new Random();
Thread[] ths = new Thread[100];
CountDownLatch latch = new CountDownLatch(ths.length);//阀门,阀门计数器
long start = System.currentTimeMillis();
for(int i=0;i<ths.length;i++) {
ths[i] = new Thread(()->{
for(int j=0;j<10000;j++) map.put("A" + r.nextInt(1000), "A" + r.nextInt(1000));
latch.countDown();
});
}
Arrays.asList(ths).forEach(t->t.start());
try{
latch.await();
} catch(Exception e) {
}
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin,当前hash对应的bin(桶)还不存在时,使用cas写入; 写入失败,则再次尝试。
}
else if ((fh = f.hash) == MOVED) //如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成后的table
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // 加锁保证线程安全,但不是对整个table加锁,只对当前的Node加锁,避免其他线程对当前Node进行写操作。
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { //如果在链表中找到值为key的节点e,直接设置e.val = value即可
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e; //如果没有找到值为key的节点,直接新建Node并加入链表即可
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //如果节点数大于阈值,则转换链表结构为红黑树结构
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); //计数增加1,有可能触发transfer操作(扩容)
return null;
}在jdk1.8中主要做了2方面的改进
改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。
另外,在其他方面也有一些小的改进,比如新增字段 transient volatile CounterCell[] counterCells; 可方便的计算hashmap中所有元素的个数,性能大大优于jdk1.7中的size()方法。
ConcurrentLinkedQueue
ConcurrentLinkedQueue使用链表作为数据结构,它采用无锁操作,可以任务是高并发环境下性能最好的队列。
ConcurrentLinkedQueue是非阻塞线程安全队列,无界,故不太适合做生产者消费者模式,而LinkedBlockingQueue是阻塞线程安全队列,可以做到有界,通常用于生产者消费者模式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class TicketSeller{
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static{
for(int i=0;i<1000; i++) tickets.add("ticket :" + i);
}
public static void main(String[] args) {
for(int i=0;i<10;i++) {
new Thread(()->{
while(true){
String s = ticket.poll();//同步,从头开始获取
if(s==null) break;
else System.out.println("sale:" + s)
}
}).start();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32/**
* 不断尝试:找到最新的tail节点,不断尝试想最新的tail节点后面添加新节点
*/
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { //不断尝试:找到最新的tail节点,不断尝试想最新的tail节点后面添加新节点。
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time //t引用有可能并不是真实的tail节点的引用,多线程操作时,允许该情况出现,只要能保证每次新增元素是在真实的tail节点上添加的即可。
casTail(t, newNode); // Failure is OK. 即使失败,也不影响下次offer新的元素,反正后面会试图寻找到最新的真实tail元素
return true;
}
// Lost CAS race to another thread; re-read next CAS竞争失败,再次尝试
}
else if (p == q) //遇到哨兵节点(next和item相同,空节点或者删除节点),从head节点重新遍历。确保找到最新的tail节点
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q; //java中'!='运算符不是原子操作,故使用t != (t = tail)做一次判定,如果tail被其他线程更改,则直接使用最新的tail节点返回。
}
}CopyOnWriteArrayList
CopyOnWriteArray 设计了CopyOnWriteArrayList和CopyOnWriteArraySet
CopyOnWriteArrayList提供高效地读取操作,使用在读多写少的场景。CopyOnWriteArrayList读取操作不用加锁,且是安全的;写操作时,先copy一份原有数据数组,再对复制数据进行写入操作,最后将复制数据替换原有数据,从而保证写操作不影响读操作。
1
2
3public class CopyOnWriteListTest{
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}
public E get(int index) {
return get(getArray(), index);
}
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
//独占锁
final ReentrantLock lock = this.lock;
lock.lock(); //写 互斥 读
try {
Object[] elements = getArray();
int len = elements.length;
//复制一个新的数组newElements
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e; //对副本进行修改操作
setArray(newElements); //将修改后的副本替换原有的数据
return true;
} finally {
lock.unlock();
}
}
}在add操作中通过一个共享的ReentrantLock来获取锁,这样可以防止多线程下多个线程同时修改容器内容。获取锁后通过Arrays.copyOf复制了一个新的容器,然后对新的容器进行了修改,最后直接通过setArray将原数组引用指向了新的数组,避免了在修改过程中迭代数据出现错误。get操作由于是读操作,未加锁,直接读取就行。
缺点:
- 性能低下,会不断申请新的空间,同时会造成频繁的GC
- 数据一致性问题,修改中copy了新的数组进行替换,同时旧数组如果还在被使用,那么新的数据就不能被及时读取到,这样就造成了数据不一致
ConcurrentSkipListMap
ConcurrentSkipListMap内部使用跳表(SkipList)这种数据结构来实现,他的结构相对红黑树来说非常简单理解,实现起来也相对简单,而且在理论上它的查找、插入、删除时间复杂度都为log(n)。在并发上,ConcurrentSkipListMap采用无锁的CAS+自旋来控制。
SkipList(跳表)是一种随机性的数据结构,用于替代红黑树,因为它在高并发的情况下,性能优于红黑树。跳表实际上是以空间换取时间。
ConcurrentSkipListMap的实现就是实现了一个无锁版的跳表,主要是利用无锁的链表的实现来管理跳表底层,同样利用CAS来完成替换。
通过分析ConcurrentSkipListMap的put方法来理解跳表以及CAS自旋并发控制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前继节点
if (n != null) { //查找到前继节点
Object v; int c;
Node<K,V> f = n.next; //获取后继节点的后继节点
if (n != b.next) //发生竞争,两次节点获取不一致,并发导致
break;
if ((v = n.value) == null) { // 节点已经被删除
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n)
break;
if ((c = cpr(cmp, key, n.key)) > 0) { //进行下一轮查找,比当前key大
b = n;
n = f;
continue;
}
if (c == 0) { //相等时直接cas修改值
if (onlyIfAbsent || n.casValue(v, value)) {
"unchecked") V vv = (V)v; (
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
if (!b.casNext(n, z)) //cas修改值
break; // restart if lost race to append to b
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取随机数
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0) // 获取跳表层级
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) { //如果获取的调表层级小于等于当前最大层级,则直接添加,并将它们组成一个上下的链表
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level //否则增加一层level,在这里体现为Index<K,V>数组
level = max + 1; // hold in array and later pick the one to use
"unchecked")Index<K,V>[] idxs = (
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j) //新添加的level层的具体数据
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// 逐层插入数据过程
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}这里的插入方法很复杂,可以分为3大步来理解:第一步获取前继节点后通过CAS来插入节点;第二步对level层数进行判断,如果大于最大层数,则插入一层;第三步插入对应层的数据。整个插入过程全部通过CAS自旋的方式保证并发情况下的数据正确性。
ConcurrentLinkedQueue, ArrayBlockingQueue与LinkedBlockingQueue
三者区别与联系:
联系,三者都是线程安全的。
区别,就是并发和阻塞,前者为并发队列,因为采用 cas 算法,所以能够高并发的处理;后2者采用锁机制,所以是阻塞的。注意点就是前者由于采用cas算法,虽然能高并发,但cas的特点造成操作的危险性,怎么危险性可以去查一下cas算法(但一些多消费性的队列还是用的它,原因看下边使用场景中的说明)
后2者区别:
联系,第2和第3都是阻塞队列,都是采用锁,都有阻塞容器Condition,通过Condition阻塞容量为空时的取操作和容量满时的写操作第。
区别,第2就一个整锁,第3是2个锁,所以第2、第3的锁机制不一样,第3比第2吞吐量大,并发性能也比第2高。
后2者的具体信息:LinkedBlockingQueue是BlockingQueue的一种使用Link List的实现,它对头和尾(取和添加操作)采用两把不同的锁,相对于 ArrayBlockingQueue 提高了吞吐量。它也是一种阻塞型的容器,适合于实现 “消费者生产者” 模式。
ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。正因为LinkedBlockingQueue使用两个独立的锁控制数据同步,所以可以使存取两种操作并行执行,从而提高并发效率。而 ArrayBlockingQueue使用一把锁,造成在存取两种操作争抢一把锁,而使得性能相对低下。LinkedBlockingQueue可以不设置队列容量,默认为Integer.MAX_VALUE.其容易造成内存溢出,一般要设置其值。
使用场景总结:
适用阻塞队列的好处:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距,自动平衡负载这个特性就造成它能被用于多生产者队列,因为你生成多了(队列满了)你就要阻塞等着,直到消费者消费使队列不满你才可以继续生产。 当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。
LinkedBlockingQueue 多用于任务队列(单线程发布任务,任务满了就停止等待阻塞,当任务被完成消费少了又开始负载 发布任务)
ConcurrentLinkedQueue 多用于消息队列(多个线程发送消息,先随便发来,不计并发的-cas特点)
多个生产者,对于LBQ性能还算可以接受;但是多个消费者就不行了mainLoop需要一个timeout的机制,否则空转,cpu会飙升的。LBQ正好提供了timeout的接口,更方便使用如果CLQ,那么我需要收到处理sleep
单生产者,单消费者用 LinkedBlockingqueue
多生产者,单消费者用 LinkedBlockingqueue
单生产者,多消费者用 ConcurrentLinkedQueue
多生产者,多消费者用 ConcurrentLinkedQueue对上边总结:
如消息队列,好多client发来消息,根据client发送先后放入队列中,先发送的就先放进来,然后由于队列是先进先出,是一个一个出来的,所以不涉及到线程安全问题,所以用LinkedBlockingqueue队列。比如还拿上边消息队列那个例子,由于队列是一个一个出来的,出来一个消息协议体就由线程池分配一个线程去处理这个消息体,这个消息体对于线程池来说谈不上共享不共享的问题,即不会多个线程去抢同一个消息体去执行,所以就不需要用线程安全的队列结构了;那假如一种情况,队列里仍然是一个一个的出来,但是出来的这个元素是 线程池共享的,即大家线程都需要用到这个从队列里出来的这个元素,也就是多消费者消费同一个东西这种情况,所以就要用线程安全的队列了,即ConcurrentLinkedQueue。