Ordinary Road


  • 首页

  • 标签

  • 分类

  • 归档

【源码学习】Java集合之ArrayList

发表于 2019-03-25 | 分类于 源码学习 | 阅读次数:

一、概述

本文不打算将ArrayList全部代码列出来(也着实没有什么必要和精力),打算就常用常见的几个功能进行分析学习(源码系列文章凡是没有特殊说明的均为jdk1.8版本)。主要内容如下:

  1. ArrayList的构造器
  2. 向ArrayList中添加元素
  3. 从ArrayList中获取元素
  4. 删除ArrayList中的元素
  5. 判断一个元素是否存在于ArrayList中
  6. ArrayList中用于遍历的迭代器

如果懒得一步一步的看的话,那么可以直接点击每一个章节中的小结看结论总结部分。

二、源码分析

2.1ArrayList的构造器

ArrayList有三种构造器,分别如下:

2.1.1ArrayList(int initialCapacity)

带有int类型参数的构造器

形如:

1
public List<String> s = new ArrayList<String>(20);

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ArrayList(int initialCapacity) {
//如果初始化大小>0
if (initialCapacity > 0) {
//初始化一个指定大小的Object类型数组
this.elementData = new Object[initialCapacity];
} else if (initialCapacity == 0) {
// ==0则初始化一个空的数组
this.elementData = EMPTY_ELEMENTDATA;
} else {
//不能初始化负数
throw new IllegalArgumentException("Illegal Capacity: "+
initialCapacity);
}
}

这里有几个基本的变量概念:

1
2
3
4
//可以理解为ArrayList的基本核心数据类型    
transient Object[] elementData; // non-private to simplify nested class access
//空数组
private static final Object[] EMPTY_ELEMENTDATA = {};

没错,既然是ArrayList,那么它的核心基本数据类型一定是一个Array,一个Object类型的Array,修饰成transient表示这个属性在用Java默认的序列化方式传输时,不可被序列化。所谓Java默认的序列化方式就是实现Serializable接口的这种形式。

介绍完elementData这个基本的参数概念之后,上边构造器的代码就如同注释所说,初始化一个指定了容量的Object类型的数组,如果容量为0则初始化一个空的数组,为负数则抛出异常。

2.1.2ArrayList()

无参数构造器,我一般常用这种形式,因为一般情况下我并不是很确认我需要初始化一个多大容量的构造器。

1
2
3
4
5
6
/**
* Constructs an empty list with an initial capacity of ten.
*/
public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}

无参构造器就是初始化一个DEFAULTCAPACITY_EMPTY_ELEMENTDATA默认大小的Object数组,那么我们看一下这个DEFAULTCAPACITY_EMPTY_ELEMENTDATA是多大呢?

1
2
3
4
5
6
/**
* Shared empty array instance used for default sized empty instances. We
* distinguish this from EMPTY_ELEMENTDATA to know how much to inflate when
* first element is added.
*/
private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};

哦豁,仍然是一个空的数组。但是我们观察注释可以发现(上边两段代码的注释),为了与EMPTY_ELEMENTDATA进行区分,这里在添加第一个元素的时候,会扩容为10,也就是初始化了一个大小为10的Object类型数组。具体扩容的事儿后边再说。

2.1.3ArrayList(Collection<? extends E> c)

带有Collection集合元素参数的构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ArrayList(Collection<? extends E> c) {
//将集合元素转换成数组
elementData = c.toArray();
//如果大小不等于0
if ((size = elementData.length) != 0) {
// c.toArray might (incorrectly) not return Object[] (see 6260652)
//如果elementData的类型不为Object数组类型 则重新拷贝
if (elementData.getClass() != Object[].class)
elementData = Arrays.copyOf(elementData, size, Object[].class);
} else {
// replace with empty array.
//==0返回一个空的数组
this.elementData = EMPTY_ELEMENTDATA;
}
}

这里有一个size变量,表示的是当前集合中元素的个数,并不是集合的容量。这段带有Collection参数的构造器可以理解为初始化一个拥有一些元素的集合。这里为什么在toArray()之后又判断了一次elementData是不是Object类型的数组呢?源码中有一行注释:

1
// c.toArray might (incorrectly) not return Object[] (see 6260652)

译为c.toArray 有可能不会返回一个Object类型的数组,这是jdk之前版本源码的一个bug,所以这里进行一个判断,如果类型变了的话就采用拷贝的方法复制入参数组。

2.1.4小结

ArrayList底层的基本数据类型是数组,它提供了三种构造器,一种是无参的(它在添加第一个元素的时候会进行数组扩容),一种是指定数组初始化长度的,一种是指定了初始化内容的。通常情况下我们建议使用第二种指定数组初始化长度的,这样可以节省空间,也可以避免数组扩容降低效率。

2.2向ArrayList中添加元素

2.2.1boolean add(E e)

向集合中添加一个E类型的元素,添加成功之后返回true

1
2
3
4
5
6
7
public boolean add(E e) {
//判断数组容量是否够存
ensureCapacityInternal(size + 1); // Increments modCount!!
//将元素插入数组下一个空缺位置 然后计数+1 (因为下标从0开始 而计数从1开始)
elementData[size++] = e;
return true;
}

这里看上去比较简单了,就是判断容量是否够用,然后添加元素。这里我们需要把ensureCapacityInternal这个方法展开,里边有一些关于ArrayList比较核心的内容。

2.2.1.1ensureCapacityInternal(int)

这个方法用来判断数组容量是否够存,即是否需要扩容

1
2
3
4
5
6
7
8
9
private void ensureCapacityInternal(int minCapacity) {
//判断数组是否是通过无参构造函数进行的初始化并且是第一次添加元素
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
//取最大值 根据条件 如果进入到这里 最大值必然为 DEFAULT_CAPACITY
minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
}
//判断数组是否需要扩容
ensureExplicitCapacity(minCapacity);
}

这里有个细节的地方在前文说到,当数组被无参构造函数初始化时,如同DEFAULTCAPACITY_EMPTY_ELEMENTDATA注释所说,在第一次添加元素的时候,数组会被扩容到大小为10的容量。这里的if判断,如果条件成立,则表示当前数组处于刚被无参构造函数初始化完,并没有加入元素。所以方法入参的minCapacity必然等于0+1也就是1,那么Math.max()方法返回的最大值也就必然是DEFAULT_CAPACITY,那么我们看下这个DEFAULT_CAPACITY值

1
2
3
4
/**
* Default initial capacity.
*/
private static final int DEFAULT_CAPACITY = 10;

好,这回对上了,无参构造函数第一次新增元素的时候被扩容成10个大小。

2.2.1.2ensureExplicitCapacity(int)

这个方法用来判断是否需要进行扩容

1
2
3
4
5
6
7
8
9
10
private void ensureExplicitCapacity(int minCapacity) {
//修改次数+1
modCount++;

// overflow-conscious code
//扩容条件 当添加完新元素之后的大小比数组当前的长度(不是已有元素长度,是初始化长度即包括未填充元素长度,数组已填充长度为size)大,则扩容
if (minCapacity - elementData.length > 0)
//扩容
grow(minCapacity);
}

这里又出现了一个基本的全局变量,这个变量是ArrayList的基类AbstractList的变量,用来记录修改的次数。

1
2
//表示ArrayList修改的次数    
protected transient int modCount = 0;

方法的逻辑就是先记录一下修改次数,如果入参的数组长度(即添加完新元素之后的最大长度)比数组当前的长度(包括未填充元素的地方,已填充的长度是size)大,那么就进行扩容。

2.2.1.3grow(int)

这个方法就是扩容的方法,可见如果我们能确定ArrayList元素的个数,那么直接初始化会省事的多,因为避免了扩容的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void grow(int minCapacity) {
// overflow-conscious code
//数组当前的长度(包括未填充的 这里最后注明一次 后边不写了)
int oldCapacity = elementData.length;
//扩容之后新数组的长度 = 现在数组的长度 + 现在数组长度除以2(这里用了>>运算,理论上要比/运算快的多) 即扩容之后的新数组长度为原来数组长度的1.5倍
int newCapacity = oldCapacity + (oldCapacity >> 1);
//如果扩容1.5倍仍然比不上入参的数组大小 即新增元素之后的数组大小 那么就将扩容之后的长度变更成添加新元素之后的数组大小
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
//如果新数组容量比MAX_ARRAY_SIZE还大 那么就看一下添加元素之后的大小是不是比MAX_ARRAY_SIZE大
//因为我们不可能无限扩容,我们指定了最大的长度为int最大值-8
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
//扩容拷贝
elementData = Arrays.copyOf(elementData, newCapacity);
}

整个方法的注释已经写在源码中,先获取现在的数组大小,然后扩容至1.5倍现在数组大小为新数组大小,然后判断这个新数组大小和添加完当前元素之后的数组大小哪个大,再判断这个新数组大小与MAX_ARRAY_SIZE比哪个大。可以看下MAX_ARRAY_SIZE的值为int的最大长度-8,emmm已经很大了撒。

1
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

如果扩容之后比MAX_ARRAY_SIZE值大,那么我们就看一下添加元素之后的大小是多大。最后调用Arrays类的copyOf方法进行数组拷贝扩容。

2.2.1.4hugeCapacity(int)

这里的入参是添加当前元素之后的大小,因为目前情况下,扩容之后的大小已经比指定的数组最大长度MAX_ARRAY_SIZE大了,所以我们有必要看一下添加完当前元素之后的长度是多少。

1
2
3
4
5
6
7
8
9
private static int hugeCapacity(int minCapacity) {
//容量已经超了 内存溢出
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
//如果添加完当前元素之后的数组比MAX_ARRAY_SIZE大,那么就取Int的最大值 ,否则就取MAX_ARRAY_SIZE,而不是取之前计算之后的比MAX_ARRAY_SIZE还要大的扩容1.5倍之后的那个值
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}

这里相关内容已经进行了注释。

2.2.1.5copyOf(Object[],int)

这个方法是Arrays类的方法,具体内容不阐述了,看源码知道是用了System的arraycopy方法进行的拷贝,把原来的数组拷贝到扩容之后的数组中。arraycopy方法是一个Native方法

1
2
3
4
5
6
7
8
9
public static <T,U> T[] copyOf(U[] original, int newLength, Class<? extends T[]> newType) {
@SuppressWarnings("unchecked")
T[] copy = ((Object)newType == (Object)Object[].class)
? (T[]) new Object[newLength]
: (T[]) Array.newInstance(newType.getComponentType(), newLength);
System.arraycopy(original, 0, copy, 0,
Math.min(original.length, newLength));
return copy;
}

2.2.2add(int index, E element)

这个方法是向集合中指定位置添加元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void add(int index, E element) {
//检查插入的位置是否合法
rangeCheckForAdd(index);
//判断是否需要扩容
ensureCapacityInternal(size + 1); // Increments modCount!!
//因为在指定位置插入元素 所以后边的元素需要顺序后移(可以看出插入效率很低)
System.arraycopy(elementData, index, elementData, index + 1,
size - index);
//空出的位置插入指定元素
elementData[index] = element;
//已有元素长度+1
size++;
}

这个方法在指定位置插入指定元素,所以先要判断指定位置是否存在并合理,然后判断是不是需要扩容,然后将数组顺序向后移动,最后在空出的位置插入新元素之后已元素数组长度+1。

2.2.2.1rangeCheckForAdd(int)

private void rangeCheckForAdd(int index) {
    if (index > size || index < 0)
        throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

这个方法用来检查指定的位置是否合法,这里需要注意一点是,插入的位置不能比当前数组长度大。然后就是不能是负数。

判断扩容的方法这里就不赘述了,与上述相同,同样是修改次数加1然后判断与当前集合大小比较。

2.2.2.2native void arraycopy(Object src, int srcPos,Object dest, int destPos,int length)

这个方法进行数组拷贝的,还是有必要说一下,当然是native方法,所以只说一下参数

1
2
3
4
5
6
7
8
//src:源数组
//srcPos:源数组下标(从哪里开始拷贝)
//dest:新数组
//destPos:新数组下标(拷贝到新数组的起始位置)
//length:拷贝的数组长度(即指从源数组中拷贝多少到新数组)
public static native void arraycopy(Object src, int srcPos,
Object dest, int destPos,
int length);

五个参数说明注释在上述代码中,这里要做的是从要插入的位置的元素开始顺序向后移动一个位置,因为要给新来的元素腾位置。所以说实际就等同于从当前位置开始到末尾整体向后移动一个位置。

2.2.3boolean addAll(Collection<? extends E> c)

这个方法是向集合中添加指定集合中的全部元素。可以理解为两个集合拼接。

1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean addAll(Collection<? extends E> c) {
//将集合转成Object类型数组
Object[] a = c.toArray();
//新来了多长的数组
int numNew = a.length;
//判断新来这么多元素之后需不需要扩容
ensureCapacityInternal(size + numNew); // Increments modCount
//把新来的数组复制到当前集合后边
System.arraycopy(a, 0, elementData, size, numNew);
//长度相应变化
size += numNew;
return numNew != 0;
}

这个方法没有什么特殊的地方,都是一些基本操作,就是把新来的数组拷贝到原来数组后边。

2.2.4boolean addAll(int index, Collection<? extends E> c)

这个方法是在指定位置添加指定的集合元素。就是前边的整合版,因此也比较简单。核心内容就是一个native方法完成的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean addAll(int index, Collection<? extends E> c) {
//检查下标
rangeCheckForAdd(index);

Object[] a = c.toArray();
int numNew = a.length;
//判断是否需要扩容
ensureCapacityInternal(size + numNew); // Increments modCount

int numMoved = size - index;
//是否需要移动元素(这里不可能<0 可以=0即变成了上一个方法 在当前集合之后插入新的集合)
if (numMoved > 0)
System.arraycopy(elementData, index, elementData, index + numNew,
numMoved);
//将新插入的集合插到腾出的位置
System.arraycopy(a, 0, elementData, index, numNew);
size += numNew;
return numNew != 0;
}

这个方法没有什么特殊的,主要是多了一个判断是否需要移动元素,可能是因为大量的集合移动比较浪费时间吧。

2.2.5小结

ArrayList一共有4种add的方法,分别为添加指定元素,添加指定集合,添加指定元素/集合到指定的位置。添加之前需要判断容量是否够用,否则需要进行扩容,扩容为当前大小的1.5倍。其数组拷贝的核心方法是native方法System.arrayCopy(),每一次新增操作修改次数参数modCount都会自增。

2.3从ArrayList中获取元素

2.3.1E get(int index)

这个方法是从集合中获取单个元素,ArrayList也只能获取单个元素。

1
2
3
4
5
6
public E get(int index) {
//下标检查
rangeCheck(index);
//返回指定位置的元素
return elementData(index);
}

这里没有什么特别的地方,就是先判断位置是否合法,然后返回指定元素。

2.3.1.1rangeCheck(int)

下标合法性校验

1
2
3
4
private void rangeCheck(int index) {
if (index >= size)
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

这里即便不校验,数组本身也会抛出异常。

2.3.1.2elementData(index)

返回数组中指定下标的元素

1
2
3
E elementData(int index) {
return (E) elementData[index];
}

2.3.2小结

ArrayList提供了一个get(int index)方法返回指定下标位置的元素。

2.4从ArrayList中删除元素

2.4.1E remove(int index)

该方法删除指定下标上的元素,并返回该元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public E remove(int index) {
//检查下标合法性
rangeCheck(index);
//修改次数+1
modCount++;
//记录要删除的元素
E oldValue = elementData(index);
//要移动元素个数
int numMoved = size - index - 1;
//如果要移动的个数>0 即不是删除的最后一个元素
if (numMoved > 0)
//将要删除的元素下一个元素起后边的所有元素整体前移一位(整体拷贝)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
//把最后一个元素赋值为null并把已有元素容量-1
elementData[--size] = null; // clear to let GC do its work
//返回删除的元素
return oldValue;
}

这个方法也都是一些基本操作,通过这个方法可以看到删除指定位置的元素实际上是把它后边的元素整体拷贝前移一位,然后将最后的元素赋值为null删除。

2.4.2 boolean remove(Object o)

这个方法是删除指定元素,并返回成功或者失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean remove(Object o) {
//ArrayList允许一个位置的元素值为null 即==null
if (o == null) {
//遍历数组找到第一个 == null的下标
for (int index = 0; index < size; index++)
if (elementData[index] == null) {
//删除该下标的元素
fastRemove(index);
return true;
}
} else {
//遍历数组找到第一个 equals o的下标
for (int index = 0; index < size; index++)
if (o.equals(elementData[index])) {
//删除该下标的元素
fastRemove(index);
return true;
}
}
//没有遍历到则返回false
return false;
}

这个方法是拿着给定的元素进行遍历,遍历分为两种,一种是==null,一种是不为null的时候用equals()方法。然后找到下标之后调用faseRemove(index)方法移除指定位置的元素。如果没有遍历到则返回false。

2.4.2.1fastRemove(int)

这个方法实际就是remove(int index)方法中的删除方法,1.7之前remove(int index)方法也是调用的fastRemove(),这里不知道为什么改了,但是方法内容还都是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
private void fastRemove(int index) {
//修改次数+1
modCount++;
//要移动的元素个数
int numMoved = size - index - 1;
if (numMoved > 0)
//拷贝要删除的元素之后的元素块向前移动一位
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
//移除最后一个元素并更改大小
elementData[--size] = null; // clear to let GC do its work
}

2.4.3boolean removeAll(Collection<?> c)

这个方法删除ArrayList中指定集合中存在的元素

1
2
3
4
5
6
public boolean removeAll(Collection<?> c) {
//确认传入的集合不为空
Objects.requireNonNull(c);
//批量删除
return batchRemove(c, false);
}

先调用Objects类的判断方法判断入参是否为null,

2.4.3.1boolean batchRemove(Collection<?> c, boolean complement)

这个方法用来批量移除指定集合中存在的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private boolean batchRemove(Collection<?> c, boolean complement) {
//当前ArrayList集合指向一个新的引用 (修改会改变原ArrayList)个人感觉这个步骤可能是为了写起来方便?不用一直this.elementData?
final Object[] elementData = this.elementData;
int r = 0, w = 0;
//是否修改成功
boolean modified = false;
try {
//遍历ArrayList集合,如果当前元素不在要删除的集合中 则把第r个元素填充到第w个位置 然后w+1 否则只有r+1 可以理解为w为遍历之后去掉删除的元素剩余的元素个数
for (; r < size; r++)
if (c.contains(elementData[r]) == complement)
elementData[w++] = elementData[r];
} finally {
// Preserve behavioral compatibility with AbstractCollection,
// even if c.contains() throws.
//正常情况下这里r一定==size 当c.contains出现异常时,r将!=size 这里为了保证一致性,将后边没遍历完的数组填充到w后 w为新数组的size 因此需要加上未遍历的个数
if (r != size) {
//把遍历剩下的数组拷贝w之后的位置
System.arraycopy(elementData, r,
elementData, w,
size - r);
w += size - r;
}
//如果有元素删除了
if (w != size) {
// clear to let GC do its work
//说明删除之后数组长度只到w 而实际长度为size 需要把多余的删除
for (int i = w; i < size; i++)
//删除长度为w之后的个数
elementData[i] = null;
//修改次数+删除次数
modCount += size - w;
//长度为w
size = w;
//修改成功
modified = true;
}
}
//返回修改成功还是失败
return modified;
}

这个方法有点长,相对有一点点绕。首先定义了一个r和w变量,r变量用来遍历原来的数组,w变量用来记录新数组的长度。入参的complement可以译为是否保存。然后看逻辑,首先是try内部的逻辑:

1
2
3
for (; r < size; r++)
if (c.contains(elementData[r]) == complement)
elementData[w++] = elementData[r];

用变量r遍历原来的数组,如果遍历道的元素在要删除的数组中,则不保留,否则保留。这里做一个示例逻辑演示一下:

假设我们现在的ArrayList数组1是1,2,3,4,5 共五个元素,我们要删除的集合数组2是3,5,6,7,8五个元素。那么我们遍历一下这段代码会出现什么情况。

  1. r=0,w=0时,因为元素1不包含在数组2中,所以执行 elementData[w++] = elementData[r],此时数组1为1,2,3,4,5 r =1,w=1
  2. r=1,w=1时,因为元素2不包含在数组2中,所以执行 elementData[w++] = elementData[r],此时数组1为1,2,3,4,5 r =2,w=2
  3. r=2,w=2时,因为元素3包含在数组2中,所以不执行后边的逻辑,此时数组1为1,2,3,4,5 r=3,w=2
  4. r=3,w=2时,因为元素4不包含在数组2中,所以执行 elementData[w++] = elementData[r],此时数组1为1,2,4,4,5 r =4,w=3
  5. r=4,w=3时,因为元素5包含在数组2中,所以不执行后边的逻辑,此时数组1为1,2,4,4,5 r=5,w=3

我们可以看到,实际上就是遍历如果不是要删除的元素就保留下来按顺序覆盖原来的数组。此时w=3,即新数组的size=3

接下来看finally中的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Preserve behavioral compatibility with AbstractCollection,
// even if c.contains() throws.
if (r != size) {
System.arraycopy(elementData, r,
elementData, w,
size - r);
w += size - r;
}
if (w != size) {
// clear to let GC do its work
for (int i = w; i < size; i++)
elementData[i] = null;
modCount += size - w;
size = w;
modified = true;
}

这里因为是用r遍历的数组size,所以理论上讲r一定是会等于size的。不过在循环中c.contains()方法可能会出现异常,因此可能会出现r!=size的情况,这时我们为了保持一致性,需要将未遍历的元素拷贝到w之后。然后修改w的长度加上未遍历的元素个数。如果w即新数组的长度不等于size,那么我们就需要把多出来的元素删除。也就是遍历w之后到size的那部分元素删除,然后修改次数加上删除的次数,最后新数组的size就是w了。

2.4.4小结

从ArrayList中删除元素有三种方法,一种是删除指定下标上的元素,一种是删除指定元素,最后一种是删除指定集合内存在的元素。删除操作与新增操作基本核心都是用了arraycopy方法移动数组块,因此理论上讲效率很低。而且remove(int)不需要遍历整个集合,而remove(Object)需要遍历集合寻找第一个符合条件的元素。remove支持移除元素==null的情况。removeAll的原理是遍历原数组找到不在要删除的集合中的元素保留下来然后删除多余的那一部分长度的数组,在调用c.contains()方法时可能出现异常。

2.5判断一个元素在集合中是否存在

2.5.1boolean contains(Object o)

该方法用于判断一个对象是否在集合中,存在返回true,否则返回false

1
2
3
4
public boolean contains(Object o) {
//返回该元素的下标 存在则>=0 否则返回-1
return indexOf(o) >= 0;
}

2.5.1.1int indexOf(Object o)

该方法返回元素在数组中的下标,不存在返回-1,支持null==判断

1
2
3
4
5
6
7
8
9
10
11
12
public int indexOf(Object o) {
if (o == null) {
for (int i = 0; i < size; i++)
if (elementData[i]==null)
return i;
} else {
for (int i = 0; i < size; i++)
if (o.equals(elementData[i]))
return i;
}
return -1;
}

2.5.1.2int lastIndexOf(Object o)

ArrayList还提供了获取匹配元素的最后一个下标,即上述方法倒叙for循环

1
2
3
4
5
6
7
8
9
10
11
12
public int lastIndexOf(Object o) {
if (o == null) {
for (int i = size-1; i >= 0; i--)
if (elementData[i]==null)
return i;
} else {
for (int i = size-1; i >= 0; i--)
if (o.equals(elementData[i]))
return i;
}
return -1;
}

2.5.2小结

ArrayList提供了contains方法判断是否存在指定对象元素,该方法通过indexOf方法返回第一个出现指定元素的下标,同时也提供了返回最后一个出现指定元素的下标。

2.6ArrayList中的迭代器

迭代器主要作用是用来遍历集合,基本使用形式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.chenxyt.threadTest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class BugTest {
public static void main(String[] args) {
List<String> list = new ArrayList<String>();
list.add("chenxyt");
list.add("yang");
list.add("nina");
Iterator<String> it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
}
}

获取一个集合的迭代器对象,然后通过这个迭代器进行对象的遍历。

2.6.1iterator()

这个方法返回一个迭代器对象

1
2
3
public Iterator<E> iterator() {
return new Itr();
}

Iterator是一个泛型接口,Itr是它的一个实现类。

2.6.2Itr

Itr是ArrayList的一个内部类,实现了迭代器Iterator接口,这里贴出类的具体内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* An optimized version of AbstractList.Itr
*/
//AbsractList.Itr的最佳版本 Itr原来的版本是AbstractList的内部类 1.8之后放到了ArrayList中
private class Itr implements Iterator<E> {
//记录该遍历的元素下标
int cursor; // index of next element to return
//记录上次返回的下标 如果没有返回默认-1
int lastRet = -1; // index of last element returned; -1 if no such
//记录修改的次数 --这个很重要 用来判断在迭代器遍历过程中ArrayList是否发生了修改
int expectedModCount = modCount;
//是否有下一个元素可以遍历 size为元素个数 cursor是下一个要遍历的下标 所以最大的cursor=size-1
//如果他俩相等了则表示遍历到最后了
public boolean hasNext() {
return cursor != size;
}
//返回下一个元素
@SuppressWarnings("unchecked")
public E next() {
//检查遍历过程是否发生了修改
checkForComodification();
//拿到当前遍历的元素下标
int i = cursor;
//下标只能<size才有数据
if (i >= size)
throw new NoSuchElementException();
//拿到ArrayList集合内部的数组对象
Object[] elementData = ArrayList.this.elementData;
//同理i也不能比length大(length==size) 因为前边判断过一次 所以这里如果比length大则表示
//在迭代器遍历的过程 ArrayList发生了list.remove()操作
if (i >= elementData.length)
throw new ConcurrentModificationException();
//记录下次要遍历的下标
cursor = i + 1;
//返回当前下标元素 并把返回的下标标记
return (E) elementData[lastRet = i];
}
//迭代器的移除上次返回的元素 即cursor-1 即 lastRet 移除元素只能移除已经遍历且返回的那个
public void remove() {
//没有返回过
if (lastRet < 0)
throw new IllegalStateException();
//检查移除过程是否发生了修改
checkForComodification();

try {
//调用集合的remove方法 移除lastRet下标上的元素
ArrayList.this.remove(lastRet);
//下次要遍历的下标-1(因为移除了遍历返回过的最末元素)
cursor = lastRet;
//重置返回标记
lastRet = -1;
//发生了移除操作 重新获取修改次数modCount
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}

@Override
@SuppressWarnings("unchecked")
//jdk8提供的快速遍历方式 与foreach基本相同
public void forEachRemaining(Consumer<? super E> consumer) {
//判断传入的Consumer对象是否为空
Objects.requireNonNull(consumer);
//获取当前集合的size
final int size = ArrayList.this.size;
//遍历的位置 如果没遍历过则初始化为0
int i = cursor;
if (i >= size) {
return;
}
//获取集合的数组对象
final Object[] elementData = ArrayList.this.elementData;
//是否发生过修改
if (i >= elementData.length) {
throw new ConcurrentModificationException();
}
//遍历 并且外部未发生修改时 执行accept重写方法的操作 比如print、remove等
while (i != size && modCount == expectedModCount) {
consumer.accept((E) elementData[i++]);
}
// update once at end of iteration to reduce heap write traffic
//更新下一个要遍历的数组下标
cursor = i;
//重置lastRest --这里结合重写的accept方法会可能会发生异常 后边单独说
lastRet = i - 1;
//检查遍历过程中是否发生了修改
checkForComodification();
}
//检查遍历过程中是否发生了移除/插入操作
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
}

相关代码上边都有注释,下面针对具体的方法进行详细的说明。

2.6.3boolean hasNext()

这个方法是判断当前迭代器是否还可以继续遍历下一个元素

1
2
3
4
public boolean hasNext() {
//cursor是当前迭代器下一次要遍历到的元素下标 没遍历一个元素 cursor+1 因为数组长度为size 而最 //大下标为size-1 所以当cursor=size时 即不可遍历了
return cursor != size;
}

这个方法的核心是返回是否可以遍历“下一个”元素。

2.6.4E next()

这个方法用来获取下一个元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SuppressWarnings("unchecked")
public E next() {
//判断在遍历的过程中 是否有元素发生过修改
checkForComodification();
//获取当前要获取的元素下标
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
//获取当前数组
Object[] elementData = ArrayList.this.elementData;
//如果要遍历获取的下标比当前长度大 则说明发生过remove操作
if (i >= elementData.length)
throw new ConcurrentModificationException();
//下一个要遍历的下标
cursor = i + 1;
//返回数组中指定下标位置的元素 并记录本次返回的下标
return (E) elementData[lastRet = i];
}

这个方法用来返回迭代器中遍历的下一个元素。即与hasNext()配合使用,如果hasNext返回了true,则调用next方法可以返回下个元素。

2.6.5checkForComodification()

这个方法用来检查在遍历的过程中是否发生过修改

1
2
3
4
5
final void checkForComodification() {
//前边可以看到 在外部类中 如果发生了增删操作 modCount会++ 而我们在迭代器内部 初始化的时候会将expectedModCount的值赋值成modCount 只有在调用了迭代器内部的remove操作之后会变更一次这个值
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}

如注释所说,expectedModCount这个值在初始化的时候会赋值,在外部被修改的时候,这里是不会做变更,只有在迭代器内部调用了remove方法之后才会进行变更。这个方法抛出的异常ConcurrentModificationException是一个经典异常。后边章节针对这个异常具体说一下。

2.6.6remove()

迭代器自带了一个remove方法,删除对应数组上的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void remove() {
//lastRet只有在进行过next()操作也就是遍历过元素之后才会>0 因此可以猜到这里remove操作remove的是之前遍历过的元素 而不能像外部类那样指定下标和元素
if (lastRet < 0)
throw new IllegalStateException();
//检查是否发生过改变
checkForComodification();
try {
//移除刚才遍历过的元素
ArrayList.this.remove(lastRet);
//删除操作会把后边的元素向前补一位 因此下一次遍历的仍然是刚才删除的那一位
cursor = lastRet;
//没有遍历过的元素了 下标改成-1
lastRet = -1;
//因为调用了remove方法 所以modCount改变了 这里重新赋值
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}

2.6.7forEachRemaining(Consumer<? super E> consumer)

这个方法是jdk1.8之后的一个方法,传入一个Consumer对象用来遍历实现相关逻辑,与for-each功能基本相同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void forEachRemaining(Consumer<? super E> consumer) {
//判断传入的对象是否为空
Objects.requireNonNull(consumer);
//获取当前数组的长度
final int size = ArrayList.this.size;
//当前遍历的位置(可能在forEachRemaining方法之前调用过其它遍历方法)
int i = cursor;
//i比size大则不需要进行遍历了
if (i >= size) {
return;
}
//获取当前数组
final Object[] elementData = ArrayList.this.elementData;
//如果i比当前数组长度大 则说明发生过修改
if (i >= elementData.length) {
throw new ConcurrentModificationException();
}
//继续遍历当前迭代器 并且保证在外部未发生过修改的情况下 执行Consumer对象重写的accept方法
while (i != size && modCount == expectedModCount) {
consumer.accept((E) elementData[i++]);
}
// update once at end of iteration to reduce heap write traffic
//假设外部发生过修改 则上述while循环会在遍历完数组之前提前退出 因此需要将下一个要遍历的值赋给cursor
cursor = i;
//上一个返回的就是i-1 这里如果重写方法中有remove操作 会发生异常
lastRet = i - 1;
//检查是否发生了修改
checkForComodification();
}

2.6.8forEachRemaining()引发的异常

上个小节说到使用forEachRemaining()方法进行遍历时,可能会出现异常。这里通过一个demo分析为何会产生异常,同时也看下forEachRemaining()方法怎么使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.chenxyt.threadTest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
public class BugTest {
public static void main(String[] args) throws InterruptedException {
ArrayList<String> list = new ArrayList();
for (int i = 0; i < 10; i++) {
list.add(String.valueOf(i));
}
Iterator iterator = list.iterator();
iterator.forEachRemaining(new Consumer() {
@Override
public void accept(Object o) {
System.out.println(o);
if (o.equals("3") ) {
System.out.println("remove");
iterator.remove();
}
}
});
}
}

如上,就是遍历然后在accept方法内部,完成迭代器的操作。这里使用迭代器进行remove()操作。

运行结果:

1554821208339

可以看到在执行remove之前抛出了异常-java.lang.IllegalStateException,根据堆栈信息可以看到是在remove方法抛出的异常。再回头看一下迭代器的remove方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
checkForComodification();

try {
ArrayList.this.remove(lastRet);
cursor = lastRet;
lastRet = -1;
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}

可以看到这个异常是在判断lastRet<0之后返回的。我们知道lastRet是与next()方法配合使用的,每用next()方法返回一个遍历元素之后,lastRet赋值为返回的元素下标。而在forEachRemaining()方法中,遍历元素是通过while循环一个一个遍历的,这期间并没有修改lastRet的值,因此这里如果直接调用了remove()方法就会抛出java.lang.IllegalStateException.

2.6.9经典异常ConcurrentModificationException

这个异常不光在ArrayList中会出现,在其它使用迭代器的集合中也会出现。下面一个示例演示这个异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.chenxyt.threadTest;
import java.util.ArrayList;
public class BugTest {
public static void main(String[] args) throws InterruptedException {
ArrayList<String> list = new ArrayList();
for (int i = 0; i < 10; i++) {
list.add(String.valueOf(i));
}
for(String s:list){
System.out.println(s);
if("3".equals(s)){
System.out.println("remove");
list.remove("3");
}
}
}
}

这个demo通过一个for-each增强for循环完成了遍历并使用ArrayList外部类的remove(Object)方法删除指定元素。运行结果如下:

1554821894511

可以看到发生了java.util.ConcurrentModificationException,因为for-each增强for循环的原理实际就是迭代器hasNext()和next()方法的循环使用。而迭代器中exceptedModCount只有初始化的时候赋值为modCount和调用迭代器内部的remove()方法之后赋值为modCount,其它时候并没有改变。因此在遍历的过程中如果调用了ArrayList外部类的remove()方法会导致modCount发生改变,从而在进行next()方法checkForComodification()方法之后抛出这个异常。

通过javap -c 可以看到这个class文件的字节码中使用了迭代器中hasNext()和next()等方法

1554822461669

2.6.10删除重复字符会出现的陷阱

假设我们有个ArrayList数组,其中有两位连着的元素是一样的,我们想删除这两个元素。有如下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.chenxyt.threadTest;
import java.util.ArrayList;
public class BugTest {
public static void main(String[] args) throws InterruptedException {
ArrayList<String> list = new ArrayList();
for (int i = 0; i < 5; i++) {
list.add(String.valueOf(i));
if(i == 3){
list.add(String.valueOf(i));
}
}
System.out.println("删除前集合元素");
for(String s : list){
System.out.println(s);
}
System.out.println("------");
for(int i = 0;i<list.size();i++){
if(list.get(i).equals("3")){
System.out.println("删除下标为:" + i + "的元素");
list.remove(i);
}
}
System.out.println("删除后集合元素");
for(String s : list){
System.out.println(s);
}
}
}

这个demo构造了一个带有重复元素“3”的ArrayList集合,然后不适用迭代器遍历删除重复的元素“3”,看一下运行结果:

1554823008905

哦豁,只删除了下标为3的元素,而重复的元素即下标为4的那个“3”没有被删除成功。这里出现这个情况的原因是因为ArrayList的remove操作会把删除位置后边的元素向前移动,原来下标为4的那个“3”在下标为3的那个“3”被删除之后向前移动了一位变成了下标3,这时候的i已经继续循环+1了,因此无法将后边重复的那个“3”删除。

2.6.11小结

ArrayList的迭代器还是相对重要些,主要用于遍历和删除操作。要注意exceptModCount的值,避免在使用迭代器遍历时调用外部的增删方法,以免发生ConcurrentModificationException异常。

2.7小结

  1. ArrayList是通过transient Object数组构造的一个容器,数组的特性就是查询快,可以达到O(1)时间复杂度,而新增删除代价比较大,因为涉及到了元素位置的移动。构造方法可以初始化指定长度的容器,也可以初始化指定元素的容器,默认的无参构造函数初始化空数组,在插入第一个元素之后会扩容为10
  2. 添加元素的时候会判断容量大小,添加完如果长度比当前数组的容量大,则扩容为当前容量的1.5倍,扩容之后使用native arraycopy方法进行数组拷贝,添加操作会修改modCount的值
  3. 获取元素直接调用get方法,返回指定下标的元素
  4. 删除元素调用fastRmove()进行删除,删除指定位置的元素或者指定元素,可以删除为null的元素,删除的过程是将要删除的元素后边一位到最后一位向前拷贝一位,然后删除最后的一位元素,删除操作会修改modCount的值
  5. contains方法返回一个元素是否在集合中,是通过indeOf()即遍历查找指定元素返回其下标与0比较,小于0则返回false不存在,否则即存在
  6. ArrayList中的迭代器用于遍历操作,在迭代器初始化的时候会将此时的modCount赋值给exceptCount,因此如果在遍历的过程中发生了外部类remove操作,修改了modCount则会抛出ConcurrentModificationException异常。
  7. jdk1.8中新增的forEachRemaining方法在遍历的过程是一个一个向后移动下标,没有像迭代器的hasNext/next操作返回lastRet值,因为调用迭代器内部的remove方法时会发生异常,因为lastRet可能为-1
  8. 不使用迭代器遍历ArrayList时,删除相邻重复元素不会成功,因为ArrayList.remove方法是把后边的元素前移了,重复元素的下标变了。

【源码学习】Java并发之CountDownLatch

发表于 2019-03-23 | 分类于 源码学习 | 阅读次数:

一、Demo演示

    在并发的场景中,常见的一个场景就是一个任务的执行要等待多个线程都执行完毕之后才执行。CountDownLatch可以很好的解决这个问题。

以下以多线程累加为背景,演示如何使用CountDownLatch

首先先看不使用CountDownLatch的时候

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.chenxyt.threadTest;
public class CountDownLatchDemo {

public static int i = 0;
public static void main(String[] args) {
for(int j=0;j<10;j++){
Thread thread = new Thread(){
@Override
public void run() {
// TODO Auto-generated method stub
add();
}
};
thread.start();
}
System.out.println(i);
}
public synchronized static void add(){
i++;
}
}

运行结果:

1553308547970

再一次运行:

1553308568992

    这里我们如果多次运行的话,会产生随机的结果。原因就是线程并不是按照我们代码看到的逻辑顺序执行的。循环创建的十个线程和主线程都有机会得到CPU的调度,当主线程被调度执行print代码时,有可能会有子线程还没有执行。因此我们需要使用一些措施来让主线程等待着子线程执行完再执行print代码。这时候CountDownLatch就登场了。

接下来演示CountDownLatch的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.chenxyt.threadTest;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
static int i = 0;
static CountDownLatch ctl = new CountDownLatch(10);
public static void main(String[] args) {
for(int j=0;j<10;j++){
Thread thread = new Thread(){
@Override
public void run() {
// TODO Auto-generated method stub
add();
}
};
thread.start();
}
try {
ctl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(i);
}
public synchronized static void add(){
i++;
ctl.countDown();
}
}

运行结果:

1553308877746

    这回我们无论运行多少次,最终的结果都是10。我们先初始化了一个CountDownLatch对象,参数为10,然后每个线程执行完自己的任务之后,调用了对象的countDown()方法,主线程调用了对象的await()方法,最后就得到了预期的结果。下边分析如何实现的。

二、源码分析

这里我们通过分析源码来知道CountDownLatch的工作原理

2.1CountDownLatch(int)

CountDownLatch是通过构造器完成初始化的

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

    构造器参数是int类型变量,当入参小于0时会抛出非法参数异常。然后是调用了Sync的构造器,构造了Sync对象,并把入参继续下传。接下来我们看下Sync是什么。

2.1.1Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//获取共享资源 将共享资源state设置成count
Sync(int count) {
setState(count);
}

//查看当前共享资源个数
int getCount() {
return getState();
}

//重写获取共享资源方法
protected int tryAcquireShared(int acquires) {
//如果当前共享资源个数为0则返回1 否则返回-1
//即获取成功为1 失败为负1
return (getState() == 0) ? 1 : -1;
}
//重写释放共享资源方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//自旋保证内部要执行的终态一定会成功
for (;;) {
//获取当前资源个数
int c = getState();
//如果资源为0 则返回释放失败 false
if (c == 0)
return false;
//资源个数-1
int nextc = c-1;
//cas操作将资源个数-1
if (compareAndSetState(c, nextc))
//成功则退出自旋 返回true
return nextc == 0;
}
}
}

    通过上边的代码我们可以看到,Sync是CountDownLatch的一个内部类,同时也是AbstractQueuedSynchronizer的一个子类。AQS前文有说过,是一个基于volatile共享变量state和FIFO双循环队列的多线程同步器框架,也就是说,Sync是我们自定义的一个同步器。自定义的同步器是重写了tryAcquire/tryRelease独占式或者是tryAcquireShared/tryReleaseShared共享式中的一组或全部,主要是通过对state的set/get/cas操作完成共享变量的获取和释放。

2.1.1.1Sync(int)

这里先只展开说一下Sync的构造函数,其它的函数下文再说,并且在上述代码中已经写了注释。

1
2
3
Sync(int count) {
setState(count);
}

    如同我们说的自定义同步器的功能一样,通过get/set/cas操作完成对state的修改。这里的构造函数是使用了基类AQS的setState(count)方法,将同步器共享资源的state的值设置成了count。

2.1.2小结

    综上所述,CountDownLatch的构造方法,就是共享式的获取了count个资源。通过内部类Sync继承了AQS并通过构造函数传参修改了共享变量的值。

2.2countDown()

接下来我们看一下Demo中每个线程执行完之后调用的countDown()方法是如何实现的。

1
2
3
public void countDown() {
sync.releaseShared(1);
}

    哦嚯,竟然如此简单。只调用了sync对象的releaseShared方法,并传参数1。内部类Sync的方法在上一节中已经给出,我们可以看到内部类中并没有releaseShared(int)这个方法,因此可以判定这个方法是AQS的。没错,之前学习AQS的时候说过,acquire/release 和 acqureShared/releaseShared分别是独占式和共享式获取/释放资源的最顶层入口。这里为了能够理解清楚,再重新列出基类的方法。

2.2.1releaseShared()

这个方法是基类AQS的方法

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
//调用自定义同步器的尝试释放资源方法
if (tryReleaseShared(arg)) {
//释放成功则尝试唤醒队列中等待的线程
doReleaseShared();
return true;
}
return false;
}

    这个方法先调用自定义同步器的tryRleaseShared(int)方法尝试释放资源,因为是共享式的,所以释放成功之后(不需要等待state=0)需要通知队列中等待的资源尝试获取资源。注意这里前边传过来的入参是1,即只释放一个资源(实际上这个参数没有用到)。

2.2.1.1tryReleaseShared(int)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//自旋
for (;;) {
//获取当前state值
int c = getState();
//如果当前state=0 则表示资源都已经释放了
if (c == 0)
return false;
//将当前state值-1
int nextc = c-1;
//cas更新当前资源个数-1
if (compareAndSetState(c, nextc))
//更新成功则退出自旋返回true
return nextc == 0;
}
}

    这个方法主要完成的操作就是释放一个资源(文中的释放/获取对于state来说,获取表示state++,释放表示state–,可以反向理解为是对state来说),也就是使state的值-1,这个操作通过自旋CAS来保证一定会执行成功得到最后的结果。

2.2.1.2doReleaseShared()

该方法为基类AQS的方法

    由前边对AQS的学习知道,当我们释放了一个资源的时候,我们要尝试去通知队列中第一个等待的线程去获取资源。对于共享式和独占式的区别是独占式是在释放之后资源个数=0时才去通知,共享式是释放了就去通知。这里是共享式释放资源的通知。这个方法也是基类的方法,就如同学习AQS时说的那样,自定义同步器不需要管队列的操作,只需要去重写对资源的获取和释放并处理好返回值即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
//自旋
for (;;) {
//获取队列头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

这里相关内容不再继续展开了,主要的功能就是完成了对队列还活动着的线程进行了一个通知。

2.2.2小结

    通过对源码的分析,我们可以看到,每个线程执行了countDown()之后会对构造函数获取的state进行-1操作,执行完-1操作会通知队列中等待的线程尝试获取资源。这里我们也可以进行猜测,队列中等待的线程就是我们Demo中执行了await()方法等待执行print任务的主线程。

2.3await()

主线程为了等待子线程都执行完任务,调用了await()方法

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

可以看到这个方法是能够响应中断的。在响应了中断之后会抛出InterruptedException异常

2.3.1acquireSharedInterruptibly()

这个方法是先尝试获取一个资源,如果获取失败了就寻找安全等待点自旋等待通知。等待过程中可以响应中断。

该方法为基类AQS方法

1
2
3
4
5
6
7
8
9
10
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取资源
if (tryAcquireShared(arg) < 0)
//获取资源如果返回负数则寻找安全等待点
doAcquireSharedInterruptibly(arg);
}

2.3.1.1tryAcquireShared(int)

这个方法是自定义同步器重写的,也是在CountDownLatch中重写的我认为比较重要的一个方法。算是核心逻辑吧

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

    哦豁,核心逻辑竟然只有一句话。就是判断当前共享资源值是不是等于0,如果等于0则返回1,否则返回-1。为什么说这里是核心逻辑呢,因为CountDownLatch实现的功能就是初始化对象的时候将共享资源个数设置成N,然后N个线程执行完之后-1,主线程等待时候需要判断共享资源个数,如果为0则表示其他线程都已经退下了。可以继续执行了,否则就需要进入等待队列等待。等待的过程中可以响应中断。

2.3.1.2doAcquireSharedInterruptibly()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将线程构造成Node节点加入到队列 标记为共享类型
final Node node = addWaiter(Node.SHARED);
//是否失败
boolean failed = true;
try {
//自旋
for (;;) {
//获取前驱
final Node p = node.predecessor();
//如果前驱是头节点
if (p == head) {
//尝试获取资源(这个方法被重写了 实际并没有获取资源 而是检查资源个数是否为0 也就是上述的核心方法)
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果资源个数为0了把当前节点设置成头
setHeadAndPropagate(node, r);
//插队导致前边的头结点无效 帮助其GC
p.next = null; // help GC
//没有失败
failed = false;
return;
}
}
//循环遍历寻找一个安全等待点 即跳过队列排在前边无效的线程节点
if (shouldParkAfterFailedAcquire(p, node) &&
//自旋阻塞等待
parkAndCheckInterrupt())
//响应了中断抛出中断异常
throw new InterruptedException();
}
} finally {
if (failed)
//如果失败了取消获取资源过程
cancelAcquire(node);
}
}

    这个过程就是把自己加入到队列中并自旋等待的过程。等待过程中可以响应中断,这里会自旋判断自己是不是头节点了,与释放资源那里是遥相呼应的,因为资源释放之后会通知队列的头节点进行获取。内部的几个方法不再进行展开,相关AQS操作在AQS中已经阐述过。这里最主要的还是上边的说的核心方法,方法名字是获取资源,但是实际重写操作内部完成的是判断资源个数是否为0。

2.3.2小结

    综上我们看到await()方法的核心内容是查看当前资源个数是否为0,如果不为0则加入到等待队列中等待唤醒。等待过程可以响应中断。唤醒之后做了tryAcquireShared()操作,依旧是判断资源是否为0,而不是字面意义去获取资源。(因为这里不是进行资源竞争,而是等待资源全部释放就可以继续完成后续的工作了。)

2.4await(long timeout, TimeUnit unit)

1
2
3
4
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

    源码中还有个带timeout参数的await方法,顾名思义就是可以设置超时时间,比如主线程等待了1min之后子线程依然没有将state全部释放掉,那么就不等它了继续执行下边的方法。这里同样是结合了AQS的一些操作。具体就补展开阐述了。

三、总结

    CountDownLatch在诸如主线程需要等待一系列子线程返回之后才继续执行的场景中使用,当然这种场景不仅仅局限于使用CountDownLatch。CountDownLatch使用方法是先用构造器构造一个对象,对象入参为要执行批量任务的线程个数,其实也可以是一个线程执行多个批量任务。所以这里如果理解为线程个数不是太准确,因为可能一个线程执行了多个任务,然后就需要等待多个任务的返回结果。所以这里合理的解释是构造函数的参数为要等待批量任务执行完成的批量个数。构造之后,每个任务执行之后调用一次countDown()方法,等待执行最终任务的线程调用await()方法。其底层原理是结合了AQS同步器完成,进行构造的时候将同步器的共享资源state设置成了N,然后每个任务执行完之后调用了countDown()方法就将N-1,调用了await()方法的线程被加入到了线程等待队列,自旋尝试获取资源,这里的获取资源是指自旋判断共享资源state是否等于0,即全部任务结束了。等待过程中可以响应中断。CountDownLatch还提供了一个可以设置超时时间的await(timeout)方法,以避免长时间阻塞。

【源码学习】Java并发之AQS源码学习

发表于 2019-03-19 | 分类于 源码学习 | 阅读次数:

一、基本概念

    AQS全称AbstractQueuedSynchronizer,抽象式的队列同步器。干什么用的呢?AQS定义了一套多线程访问共享资源的同步器框架。为什么说是框架呢,因为它并不是直接实现了同步器功能,而是提供了一套基础的功能,许多同步类都依赖它,诸如ReetrantLock(可重入锁)、Semaphore(信号量)、CountDownLatch(多线程访问计数)

该框架结构如图:

png1

    它维护了一个volatile修饰的int类型变量state(表示共享资源)和一个双向的FIFO线程等待队列(同步器嘛,肯定是会有等待的噻,等待的线程放入等待队列),同步队列拥有指向头结点的head节点和指向尾节点的tail节点。volatile保证资源状态的更新可以对其它线程可见,具体细节不赘述。state表示共享资源,访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

    结合线程获取锁的场景可以理解state共享变量就是一个锁标记,有获取锁操作,有获取到了上锁操作(要释放的时候解锁操作)和基于CAS形式的上锁/解锁操作(该方法可以保证原子性)。

    AQS定义了两种资源共享方式,一种是Exclusive(读占式,只有一个线程可以占用如ReentrantLock),另一种是Share(共享式,可以被多个线程占用,如读锁,CountDownLatch,Semaphore)

    前文说到,AQS主要是提供一种并发访问资源的框架,其具体需要使用哪种共享方式需要根据不同的自定义同步器来决定。AQS的设计是基于模板方法模式的,也就是说使用者(自定义同步器)需要继承同步器AQS并重写相应的方法,然后将同步器AQS组合在我们的自定义同步组件中,并调用AQS提供的模板方法,该方法会调用我们重写的方法来完成相应的功能。

    自定义同步器需要实现的方法是共享资源state的获取与释放,至于线程等待队列的维护等一系列操作都由AQS实现好了,自定义同步器主要实现以下方法:

  • boolean tryAcquire(int):独占式尝试获取资源,成功返回true,失败返回false
  • boolean tryRelease(int):独占式尝试释放资源,成功返回true,失败返回false
  • int tryAcquireShared(int):共享式尝试获取资源,负数表示失败,0表示成功但没有剩余可用资源,正数表示成功且有剩余可用资源
  • boolean tryReleaseShared(int):共享式尝试释放资源,如果释放之后允许唤醒后续等待节点则返回true,否则返回false
  • isHeldExculsively():该线程是否独占资源,只有用到Condition才去实现它

    以ReentrantLock为例,共享资源state初始化为0,当线程A调用Lock.lock()方法之后,会调用tryAcqurie()方法获取独占锁并将state+1,后续其它线程再调用lock()方法时会报错,当前线程可以继续调用,state继续+1,这是重入锁的概念,获取了多少次锁就需要释放多少次锁,最终目的是保证state状态等于0,以便其它线程有机会可以获取到锁。

    以CountDownLatch为例,在没看源码之前,一直认为CountDownLatch是通过类似wait/notify的形式实现的。实际上对于CountDownLatch,当将任务分成N个线程执行时,state也被初始化为N,两个N相同,然后调用了await()方法的线程会阻塞直到state归零,其它子线程执行完对应任务之后,显示的调用countDown()方法之后,共享资源state会以CAS的方式减一,直到减为0之后可以unpark()主线程继续执行后续任务。

    通常情况下,自定义同步器都是独占或者共享中的一种,有特殊的就是ReentrantReadWriteLock(),实现了独占和共享两种方式。

  • 小结:AQS是抽象式的队列同步器,基本框架是维护了一个volatile类型的变量和FIFO双端链表队列主要是通过模板方法模式为其它自定义同步类提供并发下的队列等待等功能,至于获取资源和释放资源的实现由自定义同步器自己完成。AQS主要分为独占式和共享式两种,实现哪种自己决定,也可以都实现。

二、源码分析

    本章开始分析AQS源码,前文说到AQS提供了模板方法来供自定义同步器访问然后调用重写的方法。这里分析源码中提供的模板方法以及其它诸如等待队列的相关实现。(重写方法我们在这里也分析不了啊哈哈~)我们按照独享式acquire-release和共享式acquireShared-releaseShared的顺序进行分析

2.1acquire(int)

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

    此方法是独占模式下线程获取共享资源的顶层入口,如果获取到了资源则直接返回,否则进入等待队列,直至获取到资源为止。且这个过程忽略中断的影响,是不是与lock()相同了。获取到共享资源之后就可以执行临界区代码了。

方法流程如下:

  1. tryAcquire()尝试直接获取资源,获取到直接返回;
  2. addWaiter()将线程加入到等待队列的尾部,并标记为独占模式;
  3. acqurieQueued()队列中线程处理方法,不断尝试使队列中的线程可以获取资源,直到成功后返回。如果在等待过程中被中断过,则返回true,否则返回false;
  4. 如果线程在等待过程中被中断过,它是不响应的,直到成功返回之后,再进行自我中断,将中断补上。

    这里的设计利用了&&短路的方式简化了编码,首先&&左边调用tryAcquire()尝试直接获取资源,如果获取成功则tryAcquire()返回true,方法前加了!将&&左半边的结果置为false,通过&&短路的功能不继续执行后续的操作。其它细节问题我们按顺序梳理:

2.1.1tryAcquire(int)

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

    这个方法尝试获取共享资源,如果成功则返回true,否则返回false,emmm那么为什么这里描述的与代码不符呢。因为前边说到这个方法是需要在自定义同步器中被重写的,比如在ReentrantLock中重写该方法。因此这里源码只抛出了异常,具体不同自定义同步器如何重写就等到学习对应源码的时候再看啦。不过前边也说到,基本的资源操作只有get/set/CAS三种方式。这个需要被重写方法没有设计成abstract形式,是因为一般情况下独占式只需要重写独占式对应的tryAcquire和tryRelease方法,而共享式也仅重写共享式的两个方法,这里避免了每个模式都需要实现以下另一个模式的接口的情况。

2.1.2addWaiter(Node)

该方法用于将等待线程放入到队列尾端,并返回当前线程节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addWaiter(Node mode) {
//新建一个Node类型的节点 mode是AQS模式,独占EXCLUSIVE;共享SHARED
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//尝试使用最快的方式直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//失败则使用另一种形式添加
enq(node);
return node;
}

    如上代码注释,创建一个Node类型的节点,然后尝试通过CAS设置队尾节点为当前节点,如果设置失败则通过enq()方法完成入队。

2.1.2.1Node

我们看一下Node的结构,Node是一个静态内部类,主要内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
static final class Node{
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

    Node节点是对每一个访问共享资源的线程的封装,主要包含了线程本身以及线程状态(是否阻塞、是否等待唤醒、是否被取消等等)。变量waitStatus表示当前线程被封装成Node节点的等待状态,共有四种取值,CANCELLED、SIGNAL、CONDITION、PROPAGATE:

  • CANCELLED:值为1,表示线程被取消,被取消了的线程不会参与资源获取,状态不会再改变,直至该节点被GC收回。
  • SIGNAL:值为-1,表示该节点如果执行完成之后,会唤醒其后续节点。
  • CONDITION:值为-2,表示该节点等待在Condition上,当其他线程的Condition调用了singal()方法后,该线程会从等待队列移到同步队列中,加入到同步状态的获取。
  • PROPAGATE:值为-3,表示下一次共享式同步状态获取会被无条件传播下去。
  • 默认值为0,表示初始化状态。

AQS判断状态时,当waitStatus>0时表示无效状态,waitStatus<0时表示有效状态。

2.1.2.2compareAndSetTail

该代码通过底层UNSFATE方式完成CAS操作。关于UNSFAE相关内容这里不做讨论。

1
2
3
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

2.1.2.3enq(Node)

该方法是另外一种设置队列尾节点的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
//自旋进行CAS操作,直至成功变成尾节点
for (;;) {
Node t = tail;
//尾节点为空则表示队列为空,初始化一个头节点,该节点也为尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//否则的话就与前边快速流程相同
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

2.1.3acquireQueued(Node,int)

    我们通过tryAcquire()形式获取资源失败了,然后将当前线程放入到了等待队列中,接下来就是对等待队列中的线程进行处理操作了,是进行等待唤醒状态,还是可以尝试获取锁然后完成临界区操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
final boolean acquireQueued(final Node node, int arg) {
//是否失败(这里开始理解错了,不细心容易产生歧义)
boolean failed = true;
try {
//是否响应了中断
boolean interrupted = false;
//自旋操作
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点为head,那么就可以尝试获取锁了(这里可能是被前驱通知了,也可以)
if (p == head && tryAcquire(arg)) {
//如果获取成功则把当前节点置为头节点
setHead(node);
//当前节点跳过它的前驱变成头结点,原来的头结点也就是前驱就无效了,等待被GC回收
p.next = null; // help GC
//获取成功,则把失败标记置为false
failed = false;
//返回是否响应了中断
return interrupted;
}
//这里同样用到了&&短路的操作,首先判断获取锁失败之后是否可以进入等待唤醒状态,如果可以则等待被唤醒变判断是否响应了中断以便返回中断信息
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//避免在获取锁的过程中发生异常
if (failed)
cancelAcquire(node);
}
}

继续看一下shouldParkAfterFailedAcquire(Node,Node)方法

2.1.3.1shouldParkAfterFailedAcquire(Node,Node)

    这个方法是判断当前线程是不是在当前队列位置中就不动了,如前边所说,万一前边的线程取消了,或者是释放了,那么队列中的节点就要一点一点向前移动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱节点的等待状态
int ws = pred.waitStatus;
//如前边介绍样,当前驱节点的状态为SIGNAL时,表示它结束之后会通知当前节点
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//大于0的状态只有取消一种,所以将当前节点前移直到前驱是有效节点处
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//除了SIGNAL之外的有效状态,尝试将其设置成SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//没有到可以等待唤醒的状态话继续外部的自旋操作
return false;
}

然后看下如果返回可以等待唤醒之后的代码也就是返回true之后&&右边的parkAndCheckInterrupt()

2.1.3.2parkAndCheckInterrupt()

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

这个方法调用LockSupport的park方法使当前线程等待,当等待结束之后检查是否发生了中断。

    至此,acquireQueued(Node,int)方法就梳理清楚了:首先是进行自旋操作,自旋内部一步一步的移动当前线程节点,直到可以获取共享资源,否则的话就移动到一个可以安全等待通知的节点等待唤醒,最后需要返回是否响应了中断,结合最外部的acquire(int)方法来看,需要判断是否进行了中断,因为需要自己弥补中断。

2.1.4小结

综上所述,acquire(int)的流程:

  1. 先通过tryAcquire()尝试获取资源,如果能获取到资源则直接处理临界区操作;
  2. 如果获取资源失败则调用addWaiter()将线程存储为Node节点放入等待队列队尾;
  3. 因为队列是FIFO形式,因此需要自旋判断当前节点的最佳等待节点等待唤醒,唤醒之后尝试获取锁并返回是否响应过中断;
  4. 如果响应过中断则自己弥补一下中断。

这个流程也就是Lock.lock()的流程,查看源码可以看见lock()方法只执行了一个acquire(1)

2.2release(int)

    上一小节说完了acquire,这里说它的反向操作,也就是释放资源。release(int)这个方法是独占式同步器释放资源的顶层入口,同样是通过get/set/cas操作共享资源完成,如果完全释放了资源即state=0时,将会唤醒等待队列中的线程来获取资源,即上边自旋操作中处于等待状态节点的线程会被唤醒,从而尝试获取资源退出自旋。

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
//与获取资源时相同,尝试调用重写的同步器方法释放资源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒等待队列里的下一个节点(当前进行释放的线程为头结点)
unparkSuccessor(h);
return true;
}
return false;
}

    这个方法判断资源是否成功释放是通过tryRelease()方法的返回值判断的,因此在自定义同步器中重写该方法时需要注意返回值的问题。

2.2.1tryRelease(int)

1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

该方法前边说过,是需要由自定义同步器重写的,因此此处只抛出异常。

2.2.2unparkSuccessor(Node)

该方法通过unpark方式唤醒队列中的下一个线程节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//获取当前节点状态(传入的节点为头节点,即当前即将释放资源的节点)
int ws = node.waitStatus;
//如果线程状态正常,则通过CAS的形式将其置为0 表示该线程节点要离开了(释放资源的线程节点)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获取下一个节点准备唤醒
Node s = node.next;
//如果下一个节点为空或者等待状态为取消态
if (s == null || s.waitStatus > 0) {
//从后向前遍历最后一个状态正常的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
//赋值即将唤醒的节点
s = t;
}
//唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}

    这个方法概括起来就是唤醒等待队列中最前边的没有放弃的线程,这里和acquireQueued()方法的自旋结合起来看,当线程唤醒,自旋里的parkAndCheckInterrupt()会返回,然后进入自旋初始判断状态,即判断当前节点是否为head,即使不为head也没有关系,因为前边遍历过了它前边的节点均已为放弃状态,因此通过shouldParkAfterFailedAcquire()之后,必然会到达头节点,然后获取资源,然后执行临界区方法!

2.2.3小结

    release(int)方法是独占式同步器的顶层入口,它会释放指定的资源,如果资源释放完毕即state=0,那么它会唤醒等待队列中的下一个节点获取同步资源。

2.3acquireShared(int)

    前边两节说的是独占式同步器获取/释放方式,接下来说共享式同步器获取/方式。这个方法是共享式线程获取共享资源的顶层入口。它获取成功直接返回,获取失败则将线程放入等待队列。获取过程中忽略中断。

1
2
3
4
5
6
public final void acquireShared(int arg) {
//调用重写的方法尝试获取指定个数资源
if (tryAcquireShared(arg) < 0)
//将线程放入等待队列
doAcquireShared(arg);
}

    同样是先通过重写的同步器方法tryAcquireShared()获取指定个数的资源,这个方法在AQS中定义好了返回值的语义,返回小于0表示获取失败,返回0表示获取成功但没有剩余资源,返回大于0表示获取成功并且还有剩余资源。获取失败之后会将线程放入等待队列。

2.3.1tryAcquireShared(int)

该方法为自定义同步器需要重写的方法,基类方法抛出异常信息

1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

2.3.2doAcquireShared(int)

该方法将线程放入等待队列队尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void doAcquireShared(int arg) {
//创建Node节点 节点类型为 方法与独占式的相同 区别在节点模式变为SHARED
final Node node = addWaiter(Node.SHARED);
//是否失败标记
boolean failed = true;
try {
//中断标记
boolean interrupted = false;
//自旋操作
for (;;) {
//获取当前节点的前驱
final Node p = node.predecessor();
//如果前驱为head 也就是当前节点为第二个节点
if (p == head) {
//尝试获取指定个数资源
int r = tryAcquireShared(arg);
//如果还有剩余
if (r >= 0) {
//将head指向自己,因当前资源剩余 因此唤醒后边的线程
setHeadAndPropagate(node, r);
//辅助前驱进行GC
p.next = null; // help GC
//被中断过则自己弥补中断
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//前驱不为head 则寻找安全等待唤醒点 与独占模式相同
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

    通读这个方法的代码,与独占式基本相同。区别在于将自身中断的过程放在了方法内部。另外一个区别是因为是共享资源,所以当前线程如果发现资源有剩余时会唤醒下一个线程。

2.3.2.1setHeadAndPropagate(Node,int)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void setHeadAndPropagate(Node node, int propagate) {
//获取头节点
Node h = head; // Record old head for check below
//将当前节点设置成头结点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
//如果资源有剩余或者当前头节点已经释放资源离开
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//唤醒下一个节点
if (s == null || s.isShared())
doReleaseShared();
}
}

    这个方法是共享式与独占式的主要区别,资源充足时会唤醒下一个节点。doRelaseShared()方法放在后边释放的时候说。

2.3.3小结

    可以看出独占式获取与共享式获取差别并不是很大,总的流程依然是先尝试通过重写的方法获取资源,获取不到就将线程加入到等待队列然后自旋直到前边节点通知可以获取时,如果获取之后资源有剩余,则唤醒后续可以获取的节点线程。

2.4releaseShared(int)

这个方法是共享式线程释放共享资源的顶层入口,流程比较简单,释放资源,唤醒后续节点

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
//释放共享资源
if (tryReleaseShared(arg)) {
//唤醒下一个节点
doReleaseShared();
return true;
}
return false;
}

    这里与独占式有一个区别,独占式是基于可重入来完成的,也就是必须当state=0完全释放之后才会通知,这里因为是共享式,其根本目的是支持并发访问,因此只要释放了资源成功就会唤醒后边的节点。

2.4.1tryReleaseShared(int)

这个跟前边一样,需要自定义重写

1
2
3
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

2.4.2doReleaseShared()

这个方法唤醒下一个节点获取资源,在获取资源时也用到了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
//自旋
for (;;) {
//获取头节点
Node h = head;
//头结点不为空并且还有后继
if (h != null && h != tail) {
//头结点线程状态
int ws = h.waitStatus;
//头结点是SIGNAL即释放需要通知后继节点
if (ws == Node.SIGNAL) {
//尝试将头结点状态置为0 失败重试
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点 与独占式相同
unparkSuccessor(h);
}
//如果头结点状态为0 则尝试
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果head变化了 则退出自旋
if (h == head) // loop if head changed
break;
}
}

2.4.3小结

    总的来说,释放共享式资源与释放独占式资源类似,先调用重写方法释放,然后唤醒队列中的节点,区别在于共享式不需要等待state=0时就可以唤醒资源,只要剩余资源数大于待唤醒的节点就可以唤醒。所以我们在自定义tryReleaseShared(int)方法时可以自定义返回值。以便实现不同的功能。

三、总结

    这篇文章主要讲了抽象式队列同步器(AQS)的原理及底层源码。AQS是一个基于FIFO循环队列和volatile类型共享变量的多线程并发访问框架。其主要目的是为自定义同步器提供模板方法,包括独占式acquire/release方法,和共享式acquireShared/releaseShared方法,这四个方法是获取/释放共享资源的顶层方法。方法内部分别调用了tryAcquire/tryRelease和tryAcquireShared/tryReleaseShared四种方法,这四种方法是需要自定义同步器自己重写的。重写主要依据set/get/cas三种方式对共享资源进行操作。独占式一个共享资源只能被一个线程获取,支持重入,共享式支持多个线程访问共享资源,如ReentrantReadLock(),处于等待队列中的线程会通过自旋操作等待前边的线程节点将其唤醒以竞争共享资源。

后续有时间补一下流程图吧,写完翻看了一遍只有一张图。

【编程技巧】如何正确的使用布尔类型

发表于 2018-12-23 | 分类于 学习笔记 | 阅读次数:

一、场景描述

    在日常开发的过程中,我们经常需要在类中定义布尔类型的变量。比如在一个业务的执行过程中,设置一个标志变量,来判断业务是否执行成功。一般情况下,有如下四种定义形式。

1
2
3
4
5
6
7
private boolean success;

private boolean isSuccess;

private Boolean success;

private Boolean isSuccess;

那么在开发中,究竟应该使用哪一种形式呢?下文逐步进行分析。

二、示例分析

    上述四种形式主要有两部分的区别,第一部分是使用基本类型boolean还是使用包装类型Boolean;第二部分是使用success命名变量还是使用isSuccess。

使用succes还是isSuccess?

    在阿里巴巴开发手册中,关于POJO类中的布尔类型变量,有如下强制性的规定:

png1

(图片中Boolean应为boolean)

我们看下不同的变量在POJO类中有什么区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.example.demo.pojo;
public class Pojo1 {

private boolean success;

public boolean isSuccess() {
return success;
}

public void setSuccess(boolean success) {
this.success = success;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.example.demo.pojo;
public class Pojo2 {

private boolean isSuccess;

public boolean isSuccess() {
return isSuccess;
}

public void setSuccess(boolean success) {
isSuccess = success;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.example.demo.pojo;
public class Pojo3 {

private Boolean success;

public Boolean getSuccess() {
return success;
}

public void setSuccess(Boolean success) {
this.success = success;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.example.demo.pojo;
public class Pojo4 {

private Boolean isSuccess;

public Boolean getSuccess() {
return isSuccess;
}

public void setSuccess(Boolean success) {
isSuccess = success;
}
}

上边四个POJO类中的getter/setter方法均为IDEA编辑器自动生成。观察可以发现如下规律:

a.基本布尔类型boolean,getter/setter均为isXXX()和setXXX()的形式

b.包装布尔类型Boolean,getter/setter均为getXXX()和setXXX()的形式

    假设此时我们使用基本布尔类型boolean,那么进一步分析Pojo1和Pojo2代码的区别。可以发现,虽然Pojo1和Pojo2中分别定义了success和isSuccess,但是它们自动生成的getter/setter方法都是isSuccess()和setSucess()。

    Java Bean中关于getter/setter方法名的定义是有明确规定的。根据JavaBeans(TM) Specification规定,如果是普通类型的参数,命名为PropetyName,需要遵循如下的规范:

1
public <PropertyType> get<PropertyName>();public void set<PropertyName>(<PropertyType> a);

但是,对于布尔类型变量PropertyName则是另一套规范:

1
public boolean is<PropertyName>();public void set<PropertyName>(boolean m);

png2

    通过对照如上的规范,我们可以发现,按照规范中的约定,在Pojo2中定义的isSuccess对应的getter方法应该为isIsSuccess,而一般的编辑器都会默认生成为isSuccess()

    再看另一种情况,假设此时我们使用的是包装布尔类型Boolean,然后同样分析Pojo3和Pojo4代码的区别,可以发现同样的对于success和isSuccess两个变量,编辑器都自动生成了getSuccess()

显然,对于isSuccess变量自动生成的getter/setter方法可能会产生问题。

    在一般情况下,上述这种问题不会产生影响,但是在序列化和反序列化的过程中可能会产生致命的错误。

这里我们使用常用的JSON序列化举例,看看常用的fastjson、jackson和Gson之间的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.example.demo.pojo;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;

public class PojoTest {
public static void main(String[] args) throws JsonProcessingException {
Pojo1 pojo1 = new Pojo1();
pojo1.setSuccess(true);
//使用fastjson进行序列化并输出
System.out.println("fastjson--->" + JSON.toJSONString(pojo1));
//使用Gson进行序列化输出
Gson gson = new Gson();
System.out.println("Gson--->" + gson.toJson(pojo1));
//使用jackson进行序列化输出
ObjectMapper objectMapper = new ObjectMapper();
System.out.println("jackson--->" + objectMapper.writeValueAsString(pojo1));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.example.demo.pojo;

import java.io.Serializable;

public class Pojo1 implements Serializable {

private static final long serialVersionUID = -804945564874861370L;

private boolean isSuccess;

public boolean isSuccess() {
return isSuccess;
}

public void setSuccess(boolean success) {
isSuccess = success;
}

public String getName(){
return "chenxyt";
}
}

    上述修改之后的Pojo1中只有一个isSuccess变量以及编辑器自动生成的getter/setter方法和我按照指定规则编写的getter方法。

上述代码运行结果:

png3

    fastjson和jackson分别序列化出了一个success和一个name并赋值,而Gson只序列化出一个isSuccess,可见不同的序列化方式会产生不同的结果。进一步分析结果可以看出,fastjson和jackson是通过反射遍历所有getter方法然后根据Java Bean命名规范而进行序列化的,它会认为这个POJO类中有一个success变量和一个name变量。

1
2
fastjson--->{"name":"chenxyt","success":true}
jackson--->{"name":"chenxyt","success":true}

而Gson则是通过反射遍历类中的属性,把其序列化成Json。

1
Gson--->{"isSuccess":true}

    那么我们使用fastjson序列化,然后使用Gson进行反序列化会产生什么结果呢?方才的name仅仅是为了验证三种序列化的方式差异,因此将其抛开来验证新的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.example.demo.pojo;

import java.io.Serializable;
import java.util.StringJoiner;

public class Pojo1 implements Serializable {

private static final long serialVersionUID = -804945564874861370L;

private boolean isSuccess;

public boolean isSuccess() {
return isSuccess;
}

public void setSuccess(boolean success) {
isSuccess = success;
}

@Override
public String toString() {
return new StringJoiner(", ", Pojo1.class.getSimpleName() + "[","]") .add("isSuccess=" + isSuccess) .toString();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.example.demo.pojo;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.Gson;

public class PojoTest {
public static void main(String[] args) throws JsonProcessingException {
Pojo1 pojo1 = new Pojo1();
pojo1.setSuccess(true);
//使用fastjson进行序列化并输出
System.out.println("fastjson--->" + JSON.toJSONString(pojo1));
//使用Gson进行反序列化输出
Gson gson = new Gson();
System.out.println("Gson--->" + gson.fromJson(JSON.toJSONString(pojo1),Pojo1.class));
}
}

运行结果:

png4

    这和我们预期的结果完全不同,这是因为fastjson在通过遍历getter方法然后根据Java Bean命名规范生成JSON对象时,生成了{”success“:true},而Gson在进行反序列化的时候,遍历整个POJO类的属性只发现了isSuccess,因此将isSuccess反序列化并赋与默认值false。这在生产环境,绝壁是致命问题啊!!!

    接下来验证另外一种假设,同样是基于success和isSuccess的前提下,使用包装Boolean的情况。可以预测结果和上边的结果没有什么不同。

首先验证序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.example.demo.pojo;
public class Pojo4 {

private Boolean isSuccess;

public Boolean getSuccess() {
return isSuccess;
}

public void setSuccess(Boolean success) {
isSuccess = success;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.example.demo.pojo;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;

public class PojoTest {
public static void main(String[] args) throws JsonProcessingException {
Pojo4 pojo4 = new Pojo4();
pojo4.setSuccess(true);
//使用fastjson进行序列化并输出
System.out.println("fastjson--->" + JSON.toJSONString(pojo4));
//使用Gson进行序列化输出
Gson gson = new Gson();
System.out.println("Gson--->" + gson.toJson(pojo4));
//使用jackson进行序列化输出
ObjectMapper objectMapper = new ObjectMapper();
System.out.println("jackson--->" + objectMapper.writeValueAsString(pojo4));
}
}

运行结果:

png5

与使用基本布尔类型的结果一致。

然后验证反序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.example.demo.pojo;

import java.util.StringJoiner;

public class Pojo4 {

private Boolean isSuccess;

public Boolean getSuccess() {
return isSuccess;
}

public void setSuccess(Boolean success) {
isSuccess = success;
}
@Override
public String toString() {
return new StringJoiner(", ", Pojo4.class.getSimpleName() + "[","]") .add("isSuccess=" + isSuccess) .toString();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.example.demo.pojo;

import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;

public class PojoTest {

public static void main(String[] args) {
Pojo4 pojo4 = new Pojo4();
pojo4.setSuccess(true);
//使用fastjson进行序列化并输出
System.out.println("fastjson--->" + JSON.toJSONString(pojo4));
//使用Gson进行反序列化输出
Gson gson = new Gson();
System.out.println("Gson--->" + gson.fromJson(JSON.toJSONString(pojo4),Pojo4.class));
}
}

运行结果:

png6

这里区别出现了!

使用Boolean还是boolean?

    如上我们验证了使用isSuccess在序列化和反序列化过程中会造成的异常情况。然后对比使用基本布尔类型boolean和包装布尔类型Boolean的不同,在对一个没有的属性进行反序列的时候,基本布尔类型会默认赋值false,而包装布尔类型会默认赋值null

在阿里巴巴开发手册中有如下规定:

png7

    这里建议我们使用包装类型,理由是包装类型在数据属性为空的时候会默认赋值为null,而基本数据类型在属性为空的时候则会赋值为false或者为0。比如常见的Double和double,Integer和int,对于null和0的不同,这在生产环境下很有可能会产生一些灾难性的异常情况。

    个人觉得,无论是null还是默认的false与0也好,从本质上讲无非是我们如何在代码中正确的对异常情况进行处理。毕竟代码中出现没有意义的null也未必见得就是一件好事。

三、总结

    在开发过程中,对于需要对业务进行判断成功与失败的标志变量,避免使用isXXX的命名形式。因为编辑器根据Java Bean的命名规范,会将其自动生成的getter/setter方法中的Is省略。从而在序列化与反序列化的过程中产生异常。

此外,对于基本布尔类型,与包装布尔类型,其实没有哪一种是绝对完美的,完美的是如何处理异常。

【设计模式】一:单例模式

发表于 2018-12-03 | 分类于 学习笔记 | 阅读次数:

一、场景问题

    考虑这样一个问题,在我们的项目中需要连接Mysql数据库,数据库连接相关配置信息都写在配置文件中,常用的配置文件有xml和properties格式,那么我们读取配置文件的时候应该怎么做呢?这里我们以properties格式的配置文件为例,在没有使用设计模式的前提下,我们通过Java中读取配置文件的方法将连接信息读取出来放在对象中,然后使用这个对象。

先写一个读取配置文件的类AppConfig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.chenxyt.java.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class AppConfig {
//定义两个用来存储配置文件内容的字符串
private String parameterA;
private String parameterB;
//访问对象的私有数据域
public String getParameterA() {
return parameterA;
}
public String getParameterB() {
return parameterB;
}
//构造方法
public AppConfig(){
readConfig();
}
//读取配置文件并赋值给存储字符串
private void readConfig(){
//获取一个properties对象的引用
Properties p = new Properties();
//输入流
InputStream in = null;
try{
//输入流获取配置文件
in = AppConfig.class.getResourceAsStream("appConfig.properties");
//输入流加载到properties对象
p.load(in);
//将配置文件的内容赋值到成员变量
this.parameterA=p.getProperty("url");
this.parameterB=p.getProperty("port");
}catch(IOException e){
//读取配置文件异常
e.printStackTrace();
}finally{
try {
//发生异常之后也要关闭输入流所以写在finally块中
in.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

在AppConfig.java的目录下编写配置文件appConfig.properties

1
2
url=127.0.0.1
port=3306

编写一个用于测试读取配置文件的客户端类Client.java

1
2
3
4
5
6
7
8
9
package com.chenxyt.java.practice;
import com.chenxyt.java.test.AppConfig;
public class Client{
public static void main(String[] args) {
AppConfig ac1 = new AppConfig();
System.out.println("paramA:" + ac1.getParameterA());
System.out.println("paramB:" + ac1.getParameterB());
}
}

运行结果:

png1

    如上所示,利用基本的对象操作,完成了对配置文件的读取。接下来思考一个问题,如果项目中有很多地方需要获取配置文件,那是不是我们需要在多个地方都new出这个对象呢?如果配置文件资源内容过多,那么频繁大量的创建相同的对象,那么将是一个不小的开销。如下:

1
2
3
4
5
6
7
8
9
10
11
12
package com.chenxyt.java.practice;
import com.chenxyt.java.test.AppConfig;
public class Client{
public static void main(String[] args) {
AppConfig ac1 = new AppConfig();
AppConfig ac2 = new AppConfig();
System.out.println("paramA:" + ac1.getParameterA());
System.out.println("paramB:" + ac1.getParameterB());
System.out.println("paramA:" + ac2.getParameterA());
System.out.println("paramB:" + ac2.getParameterB());
}
}

运行结果:

png2

    我们每创建一个对象对象内部的私有数据源就要被使用,并且也占用了大量的内存。事实上对于AppConfig这种公用的类,在运行时获取一次资源就可以了,因为资源都是固定的。

所以上面的问题抽象出来就是,一个类在程序的运行过程中,只需要一个实例,应该怎样做。

二、解决方案

    上述问题的解决方案就是使用单例模式,单例模式的目的是只创造一个类的实例,但是可以多次访问,只不过每次访问的都是之前的实例。这一点有点类似static作用域。其次,对于类中的构造函数,理论上我们可以通过构造函数进行实例的创建,因此我们需要避免用户客户端可以通过构造函数创建实例的方式,也就是使用private作用域修饰构造方法。然后提供一个公用的方法作为接入点,获取单例。因此单例模式的主要实现思路就是使用static+private+新的方法完成。

Java中设计模式主要有两种,懒汉式和饿汉式

懒汉式单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.chenxyt.java.test;
public class Singleton{
 //定义一个存放单例对象的变量
 private static Singleton uniqueInstance = null;
 //私有化构造函数,保证实例个数
 private Singleton(){
  //---处理业务,给对象的私有域赋值
 }
 //加锁保证线程安全 提供公共的获取实例方法 设置成静态方法,保证不用对象就可以调用
 public synchronized static Singleton getInstance(){
  //懒汉式设计,如果实例不存在则初始化
  if(uniqueInstance==null){
   uniqueInstance = new Singleton();
  }
  return uniqueInstance;
 }
}

饿汉式单例:

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.chenxyt.java.test;
public class Singleton{
 //定义一个存放实例的变量
 private static Singleton uniqueInstance = new Singleton();
 //私有化构造函数,保证实例个数
 private Singleton(){
  //---处理业务,给对象的私有域赋值
 }
 //提供公共的调用方法 由于只返回实例,所以不存在线程不安全问题
 public static Singleton getInstance(){
  return uniqueInstance;
 }
}

    所谓的懒汉式,就是唯一的类实例只有当马上要使用的时候才会创建,而饿汉式则是比较着急的那种,在类加载的时候就已经创建了该类实例。

现在知道了单例设计模式,那么我们使用单例设计模式重写上边的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.chenxyt.java.test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class Singleton {
//先创建一个实例
private static Singleton uniqueInstance = new Singleton();

public static Singleton getInstance(){
//返回唯一的类实例
return uniqueInstance;
}
//私有化构造方法,收回创建实例的权限
private Singleton(){
readConfig();
}
//建立两个成员变量存储配置文件的内容
private String parameterA;
private String parameterB;
//获取私有成员变量的方法
public String getParameterA() {
return parameterA;
}
public String getParameterB() {
return parameterB;
}
//读取配置文件并赋值给存储字符串
private void readConfig(){
//获取一个properties对象的引用
Properties p = new Properties();
//输入流
InputStream in = null;
try{
//输入流获取配置文件
in = Singleton.class.getResourceAsStream("appConfig.properties");
//输入流加载到properties对象
p.load(in);
//将配置文件的内容赋值到成员变量
this.parameterA=p.getProperty("url");
this.parameterB=p.getProperty("port");
}catch(IOException e){
//读取配置文件异常
e.printStackTrace();
}finally{
try {
//发生异常之后也要关闭输入流所以写在finally块中
in.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}

客户端的调用方式也要相应改变一下:

1
2
3
4
5
6
7
8
9
package com.chenxyt.java.practice;
import com.chenxyt.java.test.Singleton;
public class Client{
public static void main(String[] args) {
Singleton sl = Singleton.getInstance();
System.out.println("paraA:" + sl.getParameterA());
System.out.println("paraB:" + sl.getParameterB());
}
}

运行结果如下:

png3

    那么怎样验证单例模式呢?在《Java编程思想》中我们了解到了“==”运算符在比较两个对象的时候,实际上是比较两个对象的引用是否相同,也就是说当两个对象的引用相同的时候,这两个引用实际上是一个,指向同一个内存区域,也就是我们说的只有一个实例。那么我们测试下,在客户端代码中进行修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.chenxyt.java.practice;
import com.chenxyt.java.test.Singleton;
public class Client{
public static void main(String[] args) {
Singleton sl1 = Singleton.getInstance();
Singleton sl2 = Singleton.getInstance();
if(sl1==sl2){
System.out.println("单例模式成功!只产生了一个实例!");
}else{
System.out.println("单例模式失败!我们是两个不同的实例!");
}
}
}

运行结果:

png4

三、模式讲解

1.单例模式的功能

    单例模式是用来保证程序运行过程中只会产生这一个实例,并且提供一个可以供全局访问的点也就是getInstance()方法来获取这个实例。单例模式只关系实例的创建方式,不涉及具体的业务场景。

2.单例模式的作用范围

    由于单例模式的原理是控制类实例的创建,因此它的作用范围在一个虚机上,因为类的加载过程是在虚机上执行的。所以我们讨论的单例模式只针对单一的系统,不讨论在集群上的情况。同时,通过Java的反射机制也可以创建类的实例,这种情况我们也不考虑,姑且暂认为没有反射机制。

3.懒汉式的实现

    前边我们写了懒汉式的实现示例代码,下面我们分析一下这段代码的设计思路。

a.私有化构造方法:单例模式的核心是收回创建实例的权限,改由自己控制,因此首先一部就是设置构造函数为私有。

1
2
private Singleton(){
}

b.提供获取实例的方法:既然我们回收了创建实例的权限,那么就需要提供一个新的方法用来获取实例。

1
2
public Singleton getInstance(){
}

c.把获取实例的方法变成静态:上边提供了一个获取实例的方法,但是这个方法是实例方法,需要使用实例进行调用,而这个方法恰好是没有实例而创建实例的方法,因此需要将该方法设置成static域,使其可以通过类名.方法的形式进行调用。

1
2
public static Singleton getInstance(){
}

d.定义存储实例的属性:我们需要定义一个变量,用来存储获取的实例,并且将这个变量设置成static以便获取实例的方法可以对其进行访问。

1
private static Singleton instance = null;

e.实现控制实例的创建:我们在getInstance()方法中实现对实例的创建控制,如果存在则返回,不存在则重新创建一个然后返回。

1
2
3
4
5
6
public static Singleton getInstance(){
if(instance==null){
instance=new Singleton();
}
return instance;
}

4.饿汉式的实现

    饿汉式与懒汉式的区别在于,饿汉式在程序开始定义变量的时候就已经初始化了,然后在getInstance()方法中直接进行了返回。这里有一个很明显的区别在于

懒汉式:

1
private static Singleton instance = null;

饿汉式:

1
private static Singleton instance = new Singleton();

区别在于:饿汉式的存储变量用到了static的特性!其实static基本就符合了单例设计模式的思想,因为:

1.因为static变量在类加载的时候进行初始化,也就是只初始化一次!

2.多个实例的static变量会共享同一块内存区域,实际上还是只用了一个!

这不正是static要实现的功能吗?!

四、单例模式的延迟加载

    单例模式的懒汉式单例提现了延迟加载的设计思想。那么什么是延迟加载呢?通俗来说,就像懒汉式设计模式那样,在程序启动的时候不去加载资源或者数据,只有等到必须要用不用不行了的时候,才去加载资源或者数据。所以称作是“延迟加载”!这种方法在实际开发中应用较为广泛,因为它尽可能的节约了资源。懒汉式的延迟加载体现如下:

1
2
3
if(instance==null){
instance=new Singleton();
}

现在要使用instance实例了,看一下有没有,没有的话没办法了,只能创建了。

五、单例模式的缓存思想

    缓存思想也是程序设计中的一个常见的功能,简单的说就是某些使用频率较高,系统资源消耗过大的时候,我们可以将这些系统资源放在外部,比如硬盘、数据库中,这样当下次使用的时候就可以先从硬盘或者数据库中获取,如果没有再去内存中获取。这样大大的降低了系统的开销。这样说来跟延迟思想多少有点相似。是的,上述代码中,null实际上就起了一个简单的缓存作用,先判断null是否是对象,如果不是,则创建一个,然后赋值给null,这样下次null就是对象了。

缓存思想是一个典型的使用空间换时间的概念。我们使用Map作为简单的缓存来重新写一下懒汉式的单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.chenxyt.java.test;
import java.util.HashMap;
import java.util.Map;
public class Singleton{
//定义键值对的key
private final static String DEFAULT_KEY = "SingletonKey";
//定义用来缓存的map
private static Map<String,Singleton> map = new HashMap<String,Singleton>();
//私有化构造函数,保证实例个数
private Singleton(){
//---处理业务,给对象的私有域赋值
}
//提供公共的调用方法 由于只返回实例,所以不存在线程不安全问题
public Singleton getInstance(){
Singleton instance = map.get(DEFAULT_KEY);
//缓存中没有就新创建一个然后放到map中
if(instance == null){
instance = new Singleton();
map.put(DEFAULT_KEY, new Singleton());
}
return instance;
}
}

    上述代码中实际上就是用Map代替了原来的null,判断map对应的key是否有值,如果有则返回,没有就创建一个新的然后加到map中去。

单例模式有很多种写法,不管哪种写法其核心思想都是不变的,保证只有一个实例。

六、单例模式的优缺点

1.时间和空间

    比较少上面的代码,懒汉式是典型的时间换空间的设计,每次使用的时候都会判断是否有实例创建。当然如果一直没有人使用,那么会节约内存空间。

    饿汉式是典型的空间换时间,当类装载的时候就会创建实例,每次访问的时候无需判断直接返回实例节约了时间,但是如果一直没有人使用那么会占用系统空间。

2.线程安全

    这里简单说一下线程安全的概念,线程安全是指两个线程同时访问同一个代码区所产生的结果是否安全。显然,不加同步关键字synchronized的懒汉式是线程不安全的,因为两个线程同时访问getInstance方法时,可能会创建两个实例出来。具体来说就是,现在实例instance为null,A线程进入创建实例,创建过程还没有完成也就是还没有将null替代,这时B线程进入,发现instance还是null,于是B线程进入创建实例,等到程序执行完,AB线程都创建了实例。

    饿汉式是线程安全的,因为虚拟机会保证类只被加载一次,而在加载的过程是不会发生并发的。

    解决懒汉式的线程不安全问题可以在方法前边加上synchronized关键字以保证同一时间只有一个线程执行这个方法,此外还有两种方式

a.双重检查锁定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.chenxyt.java.test;
public class Singleton{
//定义一个用来存储变量的值
private static volatile Singleton instance = null;
//私有化构造函数,保证实例个数
private Singleton(){
//---处理业务,给对象的私有域赋值
}
//提供公共的调用方法 由于只返回实例,所以不存在线程不安全问题
public static Singleton getInstance(){
if(instance == null){
synchronized (Singleton.class) {
if(instance == null){
instance = new Singleton();
}
}
}
return instance;
}
}

    这里之所有再嵌套一个if判断,是因为假设高并发的情况A跟B都进入第一个if了,那么如果不判断最终还是会有可能创建两个实例。同时这里与其它示例的区别,instance被使用volatile修饰了。这是因为不被volatile修饰同样也会存在高并发下创建两个实例的情况。具体原因是可能会出现第一个线程创建完instance实例之后还没有来得及被第二个进入到if嵌套内部的线程发现,以至于第二个线程认为instance还没有被实例化,所以创建了重复的实例。使用volatile可以保证多线程情况下的资源可见性。具体分析链接:https://www.cnblogs.com/damonhuang/p/5431866.html

b.静态内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.chenxyt.java.practice;
public class Singleton {
 //定义静态内部类 只有在使用时才被加载
 private static class SingletonHolder{
  //由JVM控制线程安全
  private static Singleton instance = new Singleton();
 }
 //私有化构造方法
 private Singleton(){
  //---
 }
 //提供对外的获取实例的方法
 public Singleton getInstance(){
  return SingletonHolder.instance;
 }
}

    当第一次调用getInstance方法时,它第一次读取LazyHolder.INSTANCE,导致内部类LazyHolder得到初始化,而这个类被装载初始化的时候会初始化其静态域,因此创建了Singleton实例,由于是static的,所以只在类加载的时候实例化了一次。这里简单了解一下上述静态内部类方法的相关基础知识:

1.什么是类级内部类?

    静态内部类也称作类级内部类,顾名思义这个内部类是有static修饰的,如果没有static修饰的内部类则称作是对象级内部类。

2.类级内部类的地位

    类级内部类相当于外部类的static部分,地位与static域或者static方法相同,是一个独立的成员,它的对象与外部类的对象不存在任何依赖关系,因此可以直接创建,而对象级的内部类是绑定在外部对象的实例中。

3.类级内部类中可以定义静态方法,在静态方法中只能够引用外部类中的静态成员方法或者成员变量。

4.如第二条所说,类级内部类相当于其外部类的成员,只有在第一次使用到时才会加载。

    在了解下关于多线程中缺省同步的情况,正常情况下我们一般使用synchronized关键字加锁控制并发,但是有几种情况由JVM自己控制并发。

1.使用static修饰的域、方法、块在加载的时候;
2.访问final字段时;
3.创建线程之前创建对象时;
4.线程可以看见它要处理的对象时。

七、单例和枚举

    JavaSE5之后提供了一种新的数据类型-枚举。单元素的枚举已经成为了实现单例模式的最佳方法。是因为枚举本身也是一个功能齐全的类,它有自己的域和方法,因此是作为单例的基础。其次,enum是通过继承Enum类实现的,所以不能再继承其它的类,但是可以用来实现接口。此外enum类也不能被继承,因为反编译可以发现该类实际上是final类。enum没有public构造器,只有private构造器,这刚好符合了单例模式的思想。

枚举实现单例模式的基本语句如下:

1
2
3
4
5
6
7
package com.chenxyt.java.practice;
enum Singleton{
uniqueInstance;
public void doSomething(){
//===
}
}

模拟使用枚举单例模式创建数据库连接:

1
2
3
4
5
6
7
8
9
10
11
package com.chenxyt.java.practice;
public enum DataSourceEnum {
DATASURCE;
private DBConnection connection = null;
private DataSourceEnum(){
connection = new DBConnection();
}
public DBConnection getConnection(){
return connection;
}
}

数据库链接类:

1
2
3
4
5
package com.chenxyt.java.practice;

public class DBConnection {

}

测试类:

1
2
3
4
5
6
7
8
9
package com.chenxyt.java.practice;

public class Main {
public static void main(String[] args) {
DBConnection conn1 = DataSourceEnum.DATASURCE.getConnection();
DBConnection conn2 = DataSourceEnum.DATASURCE.getConnection();
System.out.println(conn1 == conn2);
}
}

运行结果:

png5

八、总结

    单例模式是较为常用的一种设计模式,掌握单例模式的应用场景以及掌握懒汉式、饿汉式的写法与区别,还有更高级别的使用内部类或者枚举形式的实现。同时也了解懒加载和缓存的设计思想。

【RabbitMQ】十:RabbitMQ与Spring 的整合

发表于 2018-11-23 | 分类于 学习笔记 | 阅读次数:

一、概述

    前面的文章中整理了常规项目下RabbitMQ实现各种通用消息队列的方式,一般的企业级项目,通常使用Spring框架来实现项目,本文主要讲述RabbitMQ与Spring的集成,通过一个简单的示例演示集成。

    示例:通过Spring管理项目,实现RabbitMQ的fanout类型交换机的消息队列,一个生产者Producer,一个fanout类型的交换机exchangeTest,两个队列queueTest和queueTest1以及两个消费者Consumer和Consumer1接收消息

png1

二、代码

首先是在pom文件中加入用于整合的依赖

1
2
3
4
5
6
<!--rabbitmq依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency

同时贴出完整的pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>smq</groupId>
<artifactId>smqp</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<!-- spring版本号 -->
<spring.version>3.2.8.RELEASE</spring.version>
<!-- log4j日志文件管理包版本 -->
<slf4j.version>1.6.6</slf4j.version>
<log4j.version>1.2.12</log4j.version>
<!-- junit版本号 -->
<junit.version>4.10</junit.version>
</properties>

<dependencies>
<!-- 添加Spring依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>

<!--单元测试依赖 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<!-- 日志文件管理包 -->
<!-- log start -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- log end -->

<!--spring单元测试依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>

<!--rabbitmq依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>1.1.0.Final</version>
</dependency>

<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>5.0.1.Final</version>
</dependency>

</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<targetPath>${basedir}/target/classes</targetPath>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<targetPath>${basedir}/target/resources</targetPath>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>2.1.1</version>
<configuration>
<warSourceExcludes>${warExcludes}</warSourceExcludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
<plugin>
<inherited>true</inherited>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

    然后新建rabbitMQ.xml文件,该文件是用来管理RabbitMQ的连接以及交换机、队列、消息监听等配置,相关说明已经在注释中描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 <!--配置connection-factory,指定连接rabbit server参数 -->
 <rabbit:connection-factory id="connectionFactory"
  username="guest" password="guest" host="127.0.0.1" port="5672"/>
 <!--定义rabbit template用于数据的接收和发送 -->
 <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"
     exchange="exchangeTest" />
 <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
 <rabbit:admin connection-factory="connectionFactory" />
 <!--定义queue -->
 <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
 <!-- 将queue与exchange进行fanout绑定 -->
 <rabbit:fanout-exchange name="exchangeTest" durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="queueTest"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:fanout-exchange>
  <!--定义queue1 -->
 <rabbit:queue name="queueTest1" durable="true" auto-delete="false" exclusive="false" />
 <!-- 将queue1与exchange进行fanout绑定 -->
 <rabbit:fanout-exchange name="exchangeTest" durable="true" auto-delete="false">
  <rabbit:bindings>
   <rabbit:binding queue="queueTest1"></rabbit:binding>
  </rabbit:bindings>
 </rabbit:fanout-exchange>
 <!-- 消息接收者 -->
 <bean id="messageReceiver1" class="com.cn.chenxyt.consumer.MessageConsumer1"></bean>
 <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" concurrency="20" prefetch="1">
             <rabbit:listener queues="queueTest" ref="messageReceiver1"/>
    </rabbit:listener-container>
    <!-- 消息接收者2 -->
 <bean id="messageReceiver2" class="com.cn.chenxyt.consumer.MessageConsumer2"></bean>
 <!-- queue1 litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory">
             <rabbit:listener queues="queueTest1" ref="messageReceiver2"/>
    </rabbit:listener-container>
</beans>

在Spring的配置文件application.xml指定扫描包的路径,将注释注册为Spring Beans

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<import resource="classpath*:rabbitmq.xml" />
<!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
<context:component-scan base-package="com.cn.chenxyt.producer,com.cn.chenxyt.consumer" />
<!-- 激活annotation功能 -->
<context:annotation-config />
<!-- 激活annotation功能 -->
<context:spring-configured />
</beans>

创建消息生产者 MessageProducer.java 通过在配置文件管理的bean来管理消息的发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.cn.chenxyt.producer;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(Object message){
logger.info("我要发送消息给消费者:{}",message);
amqpTemplate.convertAndSend("这条消息来自生产者==" +message);
}
}

创建消息消费者1和消费者2,建立监听,监听名与配置文件中的监听相同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.cn.chenxyt.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageConsumer1 implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer1.class);
@Override
public void onMessage(Message message) {
logger.info("我是消费者1我收到消息了:{}",message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.cn.chenxyt.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageConsumer2 implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer2.class);
@Override
public void onMessage(Message message) {
logger.info("我是消费者2我收到消息了:{}",message);
}
}

创建启动类,加载spring配置文件,并获取发送者的bean进行消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.cn.chenxyt.producer;
import java.util.Random;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProjectStart {
private static ApplicationContext context = null;
public static void main(String[] args) throws InterruptedException {
context = new ClassPathXmlApplicationContext("application.xml");
MessageProducer messageProducer = (MessageProducer) context.getBean("messageProducer");
Random random = new Random();
while(true){
Thread.sleep(2000);
messageProducer.sendMessage(random.nextInt());
}
}

}

启动ProjectStart.java可以看见控制台打印出消息的内容

png2

以上就是RabbitMQ整合Spring的示例.

三、关于缓冲池以及并发的控制

    前文讲过关于RabbitMQ消息处理缓冲池prefetch的概念,当两个消费者处理消息的能力差距在千倍级别以上时,可以考虑通过改变prefetch大小的方式来合理的利用资源。这里在整合了Spring之后,还可以通过给处理慢的消费者增开线程的方式来提高处理速度(查阅了很多资料,没有找到如何在不整合Spring的前提下增加线程数量)。对比上文rabbitMQ.xml文件中,两个消费者监听的配置是有不一样的地方的

消费者1

1
2
3
4
5
6
<!-- 消息接收者 -->
<bean id="messageReceiver1" class="com.cn.chenxyt.consumer.MessageConsumer1"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" concurrency="20" prefetch="1">
<rabbit:listener queues="queueTest" ref="messageReceiver1"/>
</rabbit:listener-container>

消费者2

1
2
3
4
5
6
   <!-- 消息接收者2 -->
<bean id="messageReceiver2" class="com.cn.chenxyt.consumer.MessageConsumer2"></bean>
<!-- queue1 litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest1" ref="messageReceiver2"/>
</rabbit:listener-container>

    可以看到这里同样有prefetch参数并且功能与之前说的相同,[RabbitMQ学习笔记八:RabbitMQ的消息确认]一文中关于阻塞问题的解决。此外这里消费者1还多了个concurrency参数,这个参数是当前开启的线程总数,也就是同时处理消息的线程数,一个线程启动一个channel通道。这里prefetch =1 concurrency=20与单纯的只设置prefetch=20是不同的,前者启动了20个线程,假设消费者处理能力缓慢,但是也会同时处理20个消息,后者只是单纯的表示我同一时间可以从队列中拿到多少消息,如果消费者处理能力缓慢,需要按顺序执行消息。也就是说前者的效率要远大于后者。

    这里我们模拟演示不同场景下分别使用concurrency和不使用的情况

    场景1:生产者1s发送一条消息,发送20条,消费者 20s处理一条,不使用concurrency以及prefetch

修改配置文件:

1
2
3
4
5
6
<!-- 消息接收者 -->
<bean id="messageReceiver1" class="com.cn.chenxyt.consumer.MessageConsumer1"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver1"/>
</rabbit:listener-container>

修改生产者代码,间隔1s发送一条消息,发送20条

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.cn.chenxyt.producer;
import java.util.Random;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProjectStart {
private static ApplicationContext context = null;
public static void main(String[] args) throws InterruptedException {
context = new ClassPathXmlApplicationContext("application.xml");
MessageProducer messageProducer = (MessageProducer) context.getBean("messageProducer");
Random random = new Random();
for(int i =0;i<20;i++){
Thread.sleep(1000);
messageProducer.sendMessage(random.nextInt());
}
}

}

修改消费者1代码,模拟任务处理时间20s,同时防止日志干扰,注释掉消费者2打印日志的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.cn.chenxyt.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageConsumer1 implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer1.class);
@Override
public void onMessage(Message message) {
try {
Thread.sleep(20000);
logger.info("消费者1线程:{}",message);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

    这里有个地方要注意,每做一个测试之前,要先看RabbitMQ控制台已经存在的队列中是否有未处理的消息,有的话及时处理或删除,以免影响测试结果。

    启动ProjectStart。控制台间隔1s打印一条发送消息的日志,一共打印20条,并且间隔20s会打印一条收到消息的日志。RabbitMQ管理台上可以看到开始的时候有1个处于unacked状态的消息,然后从1递增到19个ready状态的消息,total从1递增到20之后开始每隔20s减1,直到最后全部为0(ready是队列中存在的未被消费者接收的消息数量,unacked是被消费者接收但是未返回的消息数量,total是他们二者之和),可以看到eclipse的控制台和RabbitMQ的管理台展示的结果相同,均表明此种情况是一种阻塞状况,20条消息是按照顺序执行的,全部执行完的时间大约是20x20=400s

png3

png4

    场景2:生产者1s发送1条消息,发送20条,消费者20s处理一条消息,设置消费者的concurrency=20,prefetch=1,即启动20个线程,每个线程能缓冲的最大消息数目为1

修改配置文件

1
2
3
<rabbit:listener-container connection-factory="connectionFactory" concurrency="20" prefetch="1">
<rabbit:listener queues="queueTest" ref="messageReceiver1"/>
</rabbit:listener-container>

    等待上一个测试的消息完全处理完成之后,启动ProjectStart.java 可以看到控制台依然间隔1s打印一条发送消息,同时在20s之后开始间隔1s打印一条收到消息,并且RabbitMQ控制台ready,unacked,total数目与eclipse控制台状态相同,同时我们可以看到启动了20个线程,即创建了20个通道。测试结果表明,消息发送出去的时候,就已经被消费者接收了,只不过消息间隔1s发送,所以消息也是间隔1s接收,然后延迟20s打印,所以这种情况处理速度约为20+20=40s

png5

png6

png7

    场景3:生产者间隔1s发送一条消息,发送20次,消费者间隔20s处理一条消息,设置消费者prefetch=20

修改配置文件:

1
2
3
<rabbit:listener-container connection-factory="connectionFactory" prefetch="20">
<rabbit:listener queues="queueTest" ref="messageReceiver1"/>
</rabbit:listener-container>

    确保上次测试的消息成功处理完之后启动ProjectStart.java,控制台间隔1s打印一条消息发出日志,间隔20s打印一条消息收到日志,与场景1的结果相同,不同的地方在于,RabbitMQ控制台ready状态为0,unacked状态间隔1s递增到20之后间隔20s递减,这种情况说明,消息都被消费者拿走了,但是由于消费者处理能力有限(一个线程,间隔时间20s)所以虽然一次拿了20个消息,但是仍然是顺序执行,20s处理一条数据。与场景1不同的是,场景1压力主要在RabbitMQ服务端,而该场景压力在消费者上。这种情况的处理速度与场景1相同约为20x20=400s

png8

png9

以上就是关于整合了spring之后的RabbitMQ并发测试相关内容。

四、代码下载

下载地址

【RabbitMQ】九:RabbitMQ实现RPC

发表于 2018-11-22 | 分类于 学习笔记 | 阅读次数:

一、概述

    前面几篇文章讲述的内容都是单向的消息传递,生产者将消息发送给消费者之后就不再管后续的业务处理了。实际业务中,有的时候我们还需要等待消费者返回结果给我们,或者是说我们需要消费者上的一个功能、一个方法或是一个接口返回给我们相应的值,而往往大型的系统软件,生产者跟消费者之间都是相互独立的两个系统,部署在两个不同的电脑上,不能通过直接对象.方法的形式获取想要的结果,这时候我们就需要用到RPC(Remote Procedure Call)远程过程调用方式。

    RabbitMQ实现RPC的方式很简单,生产者发送一条带有标签(消息ID(correlation_id)+回调队列名称)的消息到发送队列,消费者(也称RPC服务端)从发送队列获取消息并处理业务,解析标签的信息将业务结果发送到指定的回调队列,生产者从回调队列中根据标签的信息获取发送消息的返回结果。

png1

    如图,客户端C发送消息,指定消息的ID=rpc_id,回调响应的队列名称为rpc_resp,消息从C发送到rpc_request队列,服务端S获取消息业务处理之后,将correlation_id附加到响应的结果发送到指定的回调队列rpc_resp中,客户端从回调队列获取消息,匹配与发送消息的correlation_id相同的值为消息应答结果。

    RabbitMQ官网的示例是客户端通过RPC方式调用服务端获取斐波那契数列的值,我们举个简单的例子,客户端通过RPC调用获取服务端求平方的方法返回值。即客户端发送消息4,服务端返回4的平方16

二、几个简单的概念

2.1回调队列

    前文说到,客户端发送消息到服务端之后,要接收返回结果,存放返回结果的队列叫做回调队列,客户端发送消息之后阻塞监听该队列返回的消息。我们可以使用随机队列命名,也可以指定队列的名称,同时,我们可以一个消息建立一个随机队列,但是通常考虑资源使用的情况,我们一般一个消费者建立一个指定的回调队列。

2.2消息属性

    AMQP协议预定了一组14个消息属性(Message Properties),常用的有如下四种消息属性

1、deliveryMode:标记消息传递模式,为2时表示持久化消息,其它值不做持久化,在前面文章讲到消息持久化使用的PERSITNAT_TEXT_PLAIN时提到过
2、contentType:内容类型,用于描述内容编码,如json
3、replyTo:应答,指定的通用的回调队列名称
4、correlationId:关联ID,指定消息的标签,方便关联RPC的请求与响应

    上述四个属性我们使用了replyTo和correlationId属性,同时因为RPC调用是具有幂等性的,所以我们可以忽视不属于我们应该获得到的correlationId。

三、示例代码

    我们梳理一下文章开始提到的简单示例,我们的RPC处理流程如下

1、客户端启动,创建请求队列rpc_request和回调队列rpc_resp
2、客户端为我们的消息请求设置两个消息属性correlationId关联ID和replyTo回调队列(rpc_resp)
3、将请求发送到rpc_request队列
4、RPC服务端监听rpc_request队列中的请求,获取消息处理业务,并把带有接收消息的correlationId的返回结果消息返回到指定回调队列(rpc_resp)
5、客户端监听rpc_resp回调队列,如果有消息,匹配correlationId,如果和请求消息的相同,那么这个消息就是返回的响应结果了。

客户端代码MqRpcClient,相关说明已经写在注释中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.cn.chenxyt.mq;

import java.io.IOException;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.AMQP.Confirm.SelectOk;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;


public class MqRpcClient{
private final static String REQUEST_QUEUE_NAME="rpc_request";
private final static String RESPONSE_QUEUE_NAME="rpc_resp";
private Channel channel;
private QueueingConsumer qConsumer;

//构造函数 初始化连接
public MqRpcClient() throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和客户端端口号
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
channel = connection.createChannel();
//创建一个请求队列
channel.queueDeclare(REQUEST_QUEUE_NAME, true, false, false, null);
//创建一个回调队列
channel.queueDeclare(RESPONSE_QUEUE_NAME,true,false,false,null);
//为通道创建一个监听(用于监听回调队列,获取返回消息)
qConsumer = new QueueingConsumer(channel);
//关联监听与监听队列 并手动应答
channel.basicConsume(RESPONSE_QUEUE_NAME,false,qConsumer);
}
public String getSquare(String message) throws Exception{
String response = "";
//定义消息属性中的correlationId
String correlationId = java.util.UUID.randomUUID().toString();
//设置消息属性的replTo和correlationId
BasicProperties properties = new BasicProperties.Builder().correlationId(correlationId).replyTo(RESPONSE_QUEUE_NAME).build();
//发送消息到请求队列rpc_request队列 ,前边说到过 如果没有exchange即没有routingKey 消息发送到与routingKey参数相同的队列中
channel.basicPublish("",REQUEST_QUEUE_NAME, properties,message.getBytes());
//阻塞监听
while(true){
QueueingConsumer.Delivery delivery = qConsumer.nextDelivery();
if(delivery.getProperties().getCorrelationId().equals(correlationId)){
response = new String(delivery.getBody(),"UTF-8");
//手动回应消息应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
break;
}
}
return response;
}
public static void main(String[] args) throws Exception {
MqRpcClient rpcClient = new MqRpcClient();
String result = rpcClient.getSquare("4");
System.out.println("resonse is :" + result);
}
}

服务端代码MqRpcServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.cn.chenxyt.mq;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class MqRpcServer {
private final static String REQUEST_QUEUE_NAME="rpc_request";

public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(REQUEST_QUEUE_NAME, true, false, false, null);
//设置prefetch值 一次处理1条数据
channel.basicQos(1);
//为请求队列设置监听 监听客户端请求 并手动应答
QueueingConsumer qConsumer = new QueueingConsumer(channel);
channel.basicConsume(REQUEST_QUEUE_NAME,false, qConsumer);
System.out.println("Server waiting Requeust.");
while(true){
QueueingConsumer.Delivery delivery = qConsumer.nextDelivery();
//将请求中的correlationId设置到回调的消息中
BasicProperties properties = delivery.getProperties();
BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
//获取客户端指定的回调队列名
String replyQueue = properties.getReplyTo();
//返回获取消息的平方
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("waiting message is:" + message);
Double mSquare = Math.pow(Integer.parseInt(message),2);
String repMsg = String.valueOf(mSquare);
channel.basicPublish("",replyQueue,replyProperties,repMsg.getBytes());
//手动回应消息应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}

启动客户端、启动服务端,可以看到控制台打出的结果,符合我们的预期结果,管理台也新建了两条队列。

png2

png3

png4

四、总结

    综上,RabbitMQ的RPC调用方式就是形成了两条队列,两个客户端(服务端相互监听),需要注意的是,如前篇所述,如果开启手动回复,要记得在代码中手动回复ACK。

五、代码下载

下载地址

【RabbitMQ】八:RabbitMQ的消息确认

发表于 2018-11-22 | 分类于 学习笔记 | 阅读次数:

一、概述

    前文说到RabbitMQ的交换机、队列、消息的持久化并不能100%的保证消息不会丢失。首先从生产者端,持久化的消息在RabbitMQ同步到磁盘之前,还需要一段时间,这个时间很短,但是不容忽视。假如此时服务器宕机了,那么消息就丢失了。这种发生在生产者上的消息丢失我们可以使用镜像队列和事务机制来保证数据的完整性。其次是消费者端,假如消费者拿到消息还未处理,发生异常而崩溃,此时这条消息队列中已经没有了,而我们的业务还需要这条消息,那么这种情况也算是消息丢失。在消费者端发生的消息丢失可以通过消费者的消息确认机制来解决。当然无论哪种方式对RabbitMQ的性能都有一定的影响。本文主要对RabbitMQ对于生产者和消费者不同的消息确认方式做一个了解,并解决在消息确认中出现的阻塞问题。

二、事务管理(生产者)

    事务管理的操作是针对于生产者向RabbitMQ服务器发送消息这一过程的。RabbitMQ对事务的管理有如下两个层面的方式:

1、AMQP协议层面,基于AMQP的事务机制

2、通道层面,将channel设置成confirm

2.1事务机制

    RabbitMQ提供了txSelect()、txCommit()和txRollback()三个方法对消息发送进行事务管理,txSelect用于将通道channel开启事务模式,txCommit用于提交事务,txRollback用户进行事务回滚操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
try{
//channel开启事务模式
channel.txSelect();
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//模拟异常
int n = 1/0;
//提交事务
channel.txCommit();
}catch(Exception e){
e.printStackTrace();
channel.txRollback();
}

假如在txCommit之前发生了异常,那么就可以通过Rollback进行回滚操作。

    以上是基于AMQP协议层的事务机制,确保了数据在生产者与RabbitMQ服务器之间的可靠性,但是性能开销较大。

2.2Confirm模式

    RabbitMQ提供了一种低消耗的事务管理方式,将channel设置成confirm模式。confirm模式的channel,通过该channel发出的消息会生成一个唯一的有序ID(从1开始),一旦消息成功发送到相应的队列之后,RabbitMQ服务端会发送给生产者一个确认标志,包含消息的ID,这样生产者就知道该消息已经发送成功了。如果消息和队列是持久化的,那么当消息成功写入磁盘之后,生产者会收到确认消息。此外服务端也可以设置basic.ack的mutiple域,表明是否是批量确认的消息,即该序号之前的所有消息都已经收到了。

    confirm的机制是异步的,生产者可以在等待的同时继续发送下一条消息,并且异步等待回调处理,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息。confirm的时间没有明确说明,并且同一个消息只会被confirm一次。

    我们在生产者使用如下代码开启channel的confirm模式,并且已经开启事务机制的channel是不能开启confirm模式的

1
channel.confirmSelect();

处理ack或者nack的方式有三种:

1、串行confirm:每发送一条消息就调用waitForConfirms()方法等待服务端confirm

1
2
3
4
5
6
7
8
9
//开启confirm模式
channel.confirmSelect();
String message = "Hello World";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//判断是否回复
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}

其中waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间

2、批量confirm:每发送一批次消息就调用waitForConfirms()方法等待服务端confirm

1
2
3
4
5
6
7
8
9
10
11
12
13
//开启confirm模式
channel.confirmSelect();
for(int i =0;i<1000;i++){
String message = "Hello World";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
if(i%100==0){
//每发送100条判断一次是否回复
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}
}
}

    批量的方法从数量级上降低了confirm的性能消耗,提高了效率,但是有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送。所以批量的confirm虽然性能提高了,但是消息的重复率也提高了。

3、异步confirm:使用监听方法,当服务端confirm了一条或多条消息后,调用回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//声明一个用来记录消息唯一ID的有序集合SortedSet
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//开启confirm模式
channel.confirmSelect();
//异步监听方法 处理ack与nack方法
channel.addConfirmListener(new ConfirmListener() {
//处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
//处理nack 与ack相同
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
//获取消息confirm的唯一ID
long nextSeqNo = channel.getNextPublishSeqNo();
String message = "Hello World.";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//将ID加入到有序集合中
confirmSet.add(nextSeqNo);
}

三、消息确认ack(消费者)

    为了保证RabbitMQ能够感知消费者正确取到了消息,RabbitMQ提供了消息确认机制,与给生产者回复ACK的方式类似,当队列发送一条消息给消费者时,会记录一个unack标志,当消费者拿到消息之后,会回复一个ack标志,从而抵消了原来的unack标志。一般情况下,我们默认是开启了自动回复ack的标志,即当消费者拿到消息之后立即回复ack而不管消息是否正确被处理,这个时间很快,以至于基本看不到unack的状态。如开篇说到,这里存在一个严重的问题,假如消息在业务处理的过程中发生异常crash了,那么这条消息就消失了,持久化也不会解决这个问题。这里就需要我们在日常的业务处理中,消费者要手动的确认消息。确认消息包括两种,一种是ack,另一种是unack,unack是表明我这条消息处理异常了,可以设置参数告诉MQ服务器是否需要将消息重新放入到队列中。同时,如果开启了手动回复确认的消费者,当消费者异常断开时,没有回复的消息会被重新放入队列供给其他消费者使用。所以程序员必须一定要记得回复消息确认,不然会导致消息重复或者大量的消息堆积。

    下面将通过一个简单的示例,演示手动回复消息确认和忘记回复消息确认的场景。示例场景:一个队列下有两个手动回复消息确认的消费者,两个消费者会按照系统自带的轮训机制获取消息,即一个获取奇数的消息,一个获取偶数的消息。

    1、消费者1和2手动回复消息(正常情况)

    2、消费者1和2手动回复消息,且消费者1忘了手动回复并且读取一部分数据之后发生异常(异常情况)

编写生产者代码,生产者发送1000条消息,并且没有消息间隔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.cn.chenxyt.mq;

import java.io.IOException;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.AMQP.Confirm.SelectOk;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;


public class MqProducer {
public final static String EXCHANGE_NAME="EXCHANGE_MQ";
public final static String QUEUE_NAME="queue";
public void sendMessage() throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和客户端端口号
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换机
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
//创建一个队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
for(int i =1;i<1000;i++){
String message = "Hello World" + (i);
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("Message send success:" + message);
}

}
}

接下来编写消费者代码,与之前相同,有两个消费者,消费者1和消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
private static String EXCHANGE_NAME="EXCHANGE_MQ";
private final static String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
System.out.println("Consumer1 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer1 Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
//false 不自动回复应答
channel.basicConsume(QUEUE_NAME,false, consumer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
private static String EXCHANGE_NAME="EXCHANGE_MQ";
private final static String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
System.out.println("Consumer2 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer2 Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
//false 不自动回复应答
channel.basicConsume(QUEUE_NAME,false, consumer);
}
}

    这里basicConsume设置为false为不自动应答,同时为了保证业务正常执行完,回复确认要写在finally代码块里。channel.basicAck()回复处理正确,channel.basicNAck()回复处理失败,参数设置为true为重新加入队列。

启动消费者1和2再启动生产者,因为两个消费者对消息延迟2s才回复,所以队列中积累了大量的unack消息

png1

png2

png3

    接下来修改消费者1代码,看一下如果程序没有回复ack确认是什么样子,注释掉消费者1的ack确认,并把生产者发送数据条数改成10条(这里如果在上边的例子改,需要保证队列里没有数据,可以在管理台把队列删掉,也可以停掉消费者把sleep时间改短然后启动把之前的消息接收完毕,当然也可以在上边测试的时候就把发送消息的数目改小一些)

1
2
3
4
5
6
7
8
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
//channel.basicAck(envelope.getDeliveryTag(), false);
}

    接下来启动消费者1和消费者2以及生产者,可以看到10条消息,有五条发送到消费者1,五条发送到消费者2,同时在消息接收完毕的时候,由于消费者1没有ack,所以管理台上一直有5个unack状态

png4

png5

png6

    这时我们停掉消费者1,模拟消费者1crash断开的状态,可以看到消费者2收到了消费者1没有ack的消息,并且管理台队列里的unack状态也没有了

png7

png8

以上就是关于消费者自动回复消息确认的相关内容。

四、阻塞的问题解决

    这里思考一个问题,就是当消费者1和消费者2都开启手动回复并且在业务执行完成之后都进行了回复,如果生产者发送了大量消息,而消费者处理业务的时间(我们用sleep时间模拟)又过长,就会导致消息队列中阻塞大量未unack的消息,会降低系统性能,即便我们把消费者2的sleep时间调低,消费者1仍然是2s处理一条消息,消费者2迅速处理完,队列中仍然积累一半unack的消息,这是为什么呢?这是因为每个消费者会有一个缓冲池prefetch的概念,prefetch是消费者一次能处理的最大unack的数量,消费者获取消息时,实际上是mq先放到了这个缓冲池中,当ack一个之后,mq从缓冲池中拿掉一个。而MQ的轮训机制恰好是按顺序分发,因为我们这里没有设置缓冲池的大小,也就是消费者一次最多能拿多少个消息没有设置,所以MQ默认你的处理能力很好,会按照顺序将消息全部分发完。所以这里就会看到消费者1刚好打印的都是奇数的消息,消费者2刚好打印的是偶数的消息。

    所以阻塞的问题的解决方案就是我们合理的设置prefetch大小,这样处理快的消费者就能够处理更多的消息,处理慢的消费者也不会发生长时间的阻塞。更详细的描述,假设有两个消费者,都设置prefetch大小为10,消费者1处理业务时间是2s,消费者2处理业务时间是2ms,那么就不会出现上边的情况消费者1积累大量的unack,这里最多的unack数目就是两个prefetch的大小之和20,同时,MQ分发消息是先塞满10个到消费者1,再塞满10个到消费者2,塞第21个的时候,先看消费者1的缓冲池有没有空位,没有的话去看消费者2,因为消费者2的处理速度比1快1000倍,所以1000条数据前10条塞给消费者1之后,后边的数据就都塞给消费者2了。

设置prefetch大小的方法,在消费者中加入如下代码

1
channel.basicQos(10);

    为了更好的说明上边的详细描述,我把代码贴出来,变化就是生产者一次发1000条信息,消费者1和消费者2设置最大prefetch值为10,同时消费者1的处理业务时间(sleep时间)2s,消费者2处理业务时间2ms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.cn.chenxyt.mq;

import java.io.IOException;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.AMQP.Confirm.SelectOk;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;


public class MqProducer {
private final static String EXCHANGE_NAME="EXCHANGE_MQ";
private final static String QUEUE_NAME="queue";

public static void main(String[] args) throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和客户端端口号
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换机
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
//创建一个队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
for(int i =1;i<=1000;i++){
String message = "Hello World" + (i);
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("Message send success:" + message);
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
private static String EXCHANGE_NAME="EXCHANGE_MQ";
private final static String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
channel.basicQos(10);
System.out.println("Consumer1 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer1 Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
//false 不自动回复应答
channel.basicConsume(QUEUE_NAME,false, consumer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
private static String EXCHANGE_NAME="EXCHANGE_MQ";
private final static String QUEUE_NAME="queue";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
channel.basicQos(10);
System.out.println("Consumer2 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer2 Received '" + message + "'");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
//false 不自动回复应答
channel.basicConsume(QUEUE_NAME,false, consumer);
}
}

    这里为了验证unak的数目与prefetch的关系,我们消费者1注掉回复确认消息的代码,启动消费者1和消费者2以及生产者

png9

png10

png11

如图可见,消费者1只处理了10条消息,消费者2把其他的消息处理了。合理的利用了有限的资源。

五、总结

    本文主要讲述了RabbitMQ与生产者和消费者之间的消息确认以及消费者手动确认消息带来的阻塞问题的解决之道。生产者的消息确认有事务机制和confirm模式两种,消费者通过自动回复ack和手动回复ack的方式确认,手动ack切记有ack和nack两种,合理安排使用。消费者手动确认带来的阻塞问题是由于没有设置缓冲池的大小,可以通过设置prefetch的大小来限制每个消费者能持有的最大unack的数量,合理的分配资源

【RabbitMQ】七:交换机、队列、消息的持久化

发表于 2018-11-22 | 分类于 学习笔记 | 阅读次数:

一、概述

    在生产过程中,难免会发生服务器宕机的事情,RabbitMQ也不例外,可能由于某种特殊情况下的异常而导致RabbitMQ宕机从而重启,那么这个时候对于消息队列里的数据,包括交换机、队列以及队列中存在消息恢复就显得尤为重要了。RabbitMQ本身带有持久化机制,包括交换机、队列以及消息的持久化。持久化的主要机制就是将信息写入磁盘,当RabbtiMQ服务宕机重启后,从磁盘中读取存入的持久化信息,恢复数据。(当然凡是都不是100%的,只能尽最大程度的保证消息不会丢失吧)

二、交换机的持久化

    在前面的示例中,我们使用常规的声明交换机的方法

1
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

    使用这种方法声明的交换机,默认不是持久化的,在服务器重启之后,交换机会消失。我们在管理台的Exchange页签下查看交换机,可以看到使用上述方法声明的交换机,Features一列是空的,即没有任何附加属性。

png1

我们换用另一种方法声明交换机

1
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);

查看一下方法的说明

1
2
3
4
5
6
7
8
9
10
11
/**
* Actively declare a non-autodelete exchange with no extra arguments
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @throws java.io.IOException if an error is encountered
* @return a declaration-confirm method to indicate the exchange was successfully declared
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

    我们可以看到第三个参数durable,如果为true时则表示要做持久化,当服务重启时,交换机依然存在,所以使用该方法声明的交换机是下面这个样子的(做测试的时候,需要先在管理台删掉原来的同名交换机)D表示durable,鼠标放在上边会显示为true

png2

    现在重启RabbitMQ服务之后,可以看到我们声明的交换机仍然存在。

三、队列的持久化

    与交换机的持久化相同,队列的持久化也是通过durable参数实现的,默认生成的随机队列不是持久化的。前面示例中声明的带有我们自定义名字的队列都是持久化的。

1
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

看一下方法的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

    第二个参数跟交换机方法的参数一样,true表示做持久化,当RabbitMQ服务重启时,队列依然存在。这里说一下后边的三个参数,exclusive是排他队列,如果一个队列被声明为排他队列,那么这个队列只能被第一次声明他的连接所见,并在连接断开的时候自动删除。这里有三点需要说明,1、同一个连接的不同channel,是可以访问同一连接下创建的排他队列的。2、排他队列只能被声明一次,其他连接不允许声明同名的排他队列。3、及时排他队列是持久化的,当连接断开或者客户端退出时,排他队列依然会被删除。autoDelete是自动删除,为true时,当没有任何消费者订阅该队列时,队列会被自动删除。arguments:其它参数

四、消息持久化

    消息的持久化是指当消息从交换机发送到队列之后,被消费者消费之前,服务器突然宕机重启,消息仍然存在。消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无意义。

通过如下代码设置消息的持久化

1
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

其中MessageProperties.PERSISTENT_TEXT_PLAIN是设置持久化的参数

我们查看basicPublish方法的定义

1
2
3
4
5
6
7
8
9
10
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

在看下BasicProperties的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public BasicProperties(
String contentType,
String contentEncoding,
Map<String,Object> headers,
Integer deliveryMode,
Integer priority,
String correlationId,
String replyTo,
String expiration,
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)

    其中deliveryMode是设置消息持久化的参数,等于1不设置持久化,等于2设置持久化。PERSISTENT_TEXT_PLAIN是实例化的一个deliveryMode=2的对象,便于编程。

1
2
3
4
5
6
7
8
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);

设置了队列的持久化和消息的持久化之后,当服务器宕机重启,存在队列中未发送的消息会依然存在。

    以上就是关于RabbitMQ中持久化的一些内容,但是并不会严格的100%保证信息不会丢失,相关内容后续再说明。

五、总结

    RabbitMQ的持久化有交换机、队列、消息的持久化。用于防止服务器宕机重启之后数据的丢失,其中交换机和队列的持久化都是设置durable参数为true,消息的持久化是设置Properties为MessageProperties.PERSITANT_TEXT_PLAIN,消息的持久化基于队列的持久化。持久化不是100%完全保证消息的可靠性。

【RabbitMQ】六:Exchange-headers

发表于 2018-11-21 | 分类于 学习笔记 | 阅读次数:

一、概述

    前面三篇文章讲述了RabbitMQ 常用的三种Exchange类型,这篇文章学习一下第四种不常用的Exchange类型:Headers这种类型与topic类型类似,只不过不是匹配routingKeys,是匹配AMQP协议中的Header,Header是一个HashTable类型的键值对,而routingKey是String类型的字符串。功能与Topic相同,消息发送者绑定消息的键值对,匹配交换机与队列之间绑定的键值对,匹配规则“x-match”有两种,一种是“any”,只要一组键值对匹配成功即可发送消息到该队列,另一种是“all”,即需要所有键值对都匹配才可以发送消息。

    大概的场景应用示意图如下,详细说明见示例代码:

png1

二、源代码

我们先测试any类型的headers,先写生产者代码,相关说明已在注释中标明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.cn.chenxyt.mq;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;


public class MqProducer {
public final static String EXCHANGE_NAME="EX_HEADER";
public static void main(String[] args) throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机、用户名、密码和客户端端口号
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"headers");
//定义发送消息的要绑定的键值对
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("aaa", "111");
headers.put("bbb", "222");
Builder properties = new BasicProperties.Builder();
properties.headers(headers);
for(int i = 0;i<500;i++){
String message = "hello" + (i);
//发送消息 绑定header键值对
channel.basicPublish(EXCHANGE_NAME,"",properties.build(),message.getBytes());
System.out.println("发送消息:" + message);
Thread.sleep(2000);
}
}
}

消费者1代码,相关说明已经在注释中标明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.cn.chenxyt.mq;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
public final static String EXCHANGE_NAME="EX_HEADER";
public final static String QUEUE_NAME="queue1";


public static void main(String[] args) throws IOException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"headers");
//定义绑定规则
Map<String, Object> headers = new Hashtable<String, Object>();
//any 匹配任意一组即可 all 全部匹配
headers.put("x-match", "any");
headers.put("aaa", "111");
headers.put("bbb", "222");
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",headers);
System.out.println("Consumer1 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer1 Received '" + message + "'");
// int i = 1/0;
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
//false 不自动回复应答
channel.basicConsume(QUEUE_NAME,true, consumer);
}
}

    消费者2代码,与1基本相同,只不过新建个队列和绑定的header,有一点要说明一下,所有新测试的交换机类型,都需要把之前已经存在的同名的交换机或者同名的队列删除,不然的话新建不会生效,即使参数不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.cn.chenxyt.mq;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
public final static String EXCHANGE_NAME="EX_HEADER";
public final static String QUEUE_NAME="queue2";
public static void main(String[] args) throws IOException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("localhost");
//创建一个新的连接 即TCP连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"headers");
//定义绑定规则
Map<String, Object> headers = new Hashtable<String, Object>();
//any 匹配任意一组即可 all 全部匹配
headers.put("x-match", "any");
headers.put("aaa", "111");
headers.put("ccc", "333");
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",headers);
System.out.println("Consumer2 Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个channel,
//告诉服务器我们需要哪个channel的消息并监听channel,如果channel中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer2 Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

    分别启动消费者1和消费者2,使他们处在监听状态,可以看到管理台有新建的headers类型的交换机和两个队列

png2

png3

png4

png5

png6

    启动生产者,可以看到消费者1和消费者2都收到了消息。说明any功能生效,即匹配到了任意一组键值对即可发送消息。

png7

png8

png9

    接下来我们测试all的情况,在管理台删除已经存在的两条队列queue1、queue2,修改消费者1和消费者2中的any修改为all

1
2
//any 匹配任意一组即可 all 全部匹配
headers.put("x-match", "all");

这时我们重新启动消费者1和消费者2使他们处于监听状态,可以在管理台看见绑定规则x-match变为all

png10

启动生产者,可以看见消费者1和消费者2都收不到消息了

png11

png12

png13

修改生产者代码,新增一组键值对,保证与queue1绑定的headers键值对完全匹配

1
2
3
4
//定义发送消息的要绑定的键值对
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("aaa", "111");
headers.put("bbb", "222");

重新启动生产者,可以看到消费者1收到了消息,消费者2没有收到消息,即all的匹配规则生效了。

png14

png15

png16

    以上就是关于headers类型的exchange的应用示例,实际应用场景中,同类型的更偏向于使用direct类型的交换机。

三、代码下载

下载地址

12…4

Crayon Cxy

Go over the mountain, and they will hear your story.

38 日志
3 分类
14 标签
友情链接
  • 六脉神间
  • My CSDN
© 2019 Crayon Cxy
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4