在Java并发包中,对于线程同步,有着CountDownLatchCyclicBarrier两个类,两者有什么区别,怎样使用呢

介绍

CountDownLatch

CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。

CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。

CyclicBarrier

CyclicBarrier可以使一定数量的线程反复地在“栅栏”位置处汇集。

当线程到达“栅栏”位置时将调用await方法,这个方法将阻塞直到所有线程都到达“栅栏”位置。如果所有线程都到达“栅栏”位置,那么“栅栏”将打开,此时所有的线程都将被释放,而“栅栏”将被重置以便下次使用。

使用

可以通过两个简单的demo说明两者用法上的区别,首先是CyclicBarrier的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CyclicBarrier cb = new CyclicBarrier(3, () -> {
System.out.println("Barrier reached");
});

ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
pool.submit(() -> {
System.out.println(Thread.currentThread().getName());
try {
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
});
}
pool.close();

可以看到,CyclicBarrier的await()方法执行后,会阻塞住当前线程,底层使用了ReentrantLock::lock()方法加锁,在所有线程都调用了await()使得CyclicBarrier计数器到达0之后,则会在当前线程中回调行定义时的代码块(也可不定义),参考如下代码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
if (index == 0) {  // tripped
Runnable command = barrierCommand;
if (command != null) {
try {
command.run();
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
nextGeneration();
return 0;
}

最后回到调用await()的地方继续往下执行。

然后是CountDownLatch,参考下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CountDownLatch cd = new CountDownLatch(3);
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
pool.submit(() -> {
cd.countDown();
System.out.println(Thread.currentThread().getName());
});
}
try {
cd.await();
System.out.println("ok");
} catch (Exception e) {
e.printStackTrace();
}
pool.close();

可以看到,CountDownLatch和CyclicBarrier使用上还是有一定的区别,在调用countDown()之后并不会使当前线程阻塞,只是给内部的计时器递减1。

首先看下await()方法

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

这里会阻塞住线程。

然后是countDown()方法

1
2
3
public void countDown() {
sync.releaseShared(1);
}

这里sync是CountDownLatch内部类Sync的对象,而Sync又是java中AbstractQueuedSynchronizer(AQS)的派生类,所以可以看出CountDownLatch通过AQS实现。接下来继续看Sync中tryReleaseShared(int)的实现

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

可以看到,这里使用CAS操作更新标记位,这里的标记位就是上面的计数器,当标记位为0后就会将阻塞的线程放行。

总结

  • CountdownLatch适用于所有线程通过某一点后通知方法,而CyclicBarrier则适合让所有线程在同一点同时执行

  • CountdownLatch利用继承AQS的共享锁来进行线程的通知,利用CAS来进行,而CyclicBarrier则利用ReentrantLock的Condition来阻塞和通知线程