并发工具类

并发工具类

CountDownLatch

它是一个同步辅助类,在计数到达 0 之前,使线程一直处于等待状态,直到计数结束,线程才继续工作

用法

一个线程等待多个线程都执行完毕,再继续自己的工作
多个线程等待某一个线程的信号,同时开始执行

主要方法
  • CountDownLatch:仅有一个构造函数,参数 count 为需要倒数的数值
  • await:调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
  • countDown:将 count 值减 1,直到为 0 时,等待的线程会被唤起
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 描述:模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步
*/
public class CountDownLatchDemo {

public static void main(String[] args) throws InterruptedException {

CountDownLatch begin = new CountDownLatch(1);

ExecutorService service = Executors.newFixedThreadPool(5);

for (int i = 0; i < 5; i++) {

final int no = i + 1;

Runnable runnable = () -> {

System.out.println("No." + no + "准备完毕,等待发令枪");

try {
begin.await();
System.out.println("No." + no + "开始跑步了");
} catch (InterruptedException e) {
e.printStackTrace();
}
};

service.submit(runnable);
}

// 裁判员检查发令枪
Thread.sleep(5000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();
}
}
注意

CountDownLatch 是不能够重用的,如果需要重新计数,可以考虑使用 CyclicBarrier 或者创建新的 CountDownLatch 实例

CyclicBarrier

CyclicBarrier 和 CountDownLatch 很类似,都能阻塞一组线程
当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们可以使用 CyclicBarrier,它可以构造一个集结点,当某一个线程执行完毕,就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 描述:集合点有5个人就出发
*/
public class CyclicBarrierDemo {

public static void main(String[] args) {

CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("所有人都到场了,大家统一出发"));

for (int i = 0; i < 10; i++) {
new Thread(new Task(i, cyclicBarrier)).start();
}
}

static class Task implements Runnable {

private int id;

private CyclicBarrier cyclicBarrier;

public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + id + "现在前往集合地点");
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程" + id + "到了集合地点,开始等待其他人到达");
cyclicBarrier.await();
System.out.println("线程" + id + "出发了");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
和 CountDownLatch 的区别
  • 作用不同:CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字到 0,也就是说,CountDownLatch 用于事件,但是 CyclicBarrier 是用于线程的
  • 可重用性不同:CountDownLatch 在倒数到 0 并触发门闩打开后,就不能再次使用了,除非新建新的实例,而 CyclicBarrier 可以重复使用

Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源,当达到指定数量时,只能等待其他线程释放信号量

主要方法
  • Semaphore:创建一个信号量,参数 permits 为许可证数量,fair 是否公平
  • tryAcquire:尝试获取许可证,参数 timeout 指定超时时间
  • acquire:获得许可证(如果有的话),然后立即返回,使可用许可证的数量减少一个
  • release:释放许可证
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 描述:同时只允许存在5个许可证
*/
public class SemaphoreDemo {

static Semaphore semaphore = new Semaphore(5, true);

public static void main(String[] args) {

ExecutorService service = Executors.newFixedThreadPool(50);

for (int i = 0; i < 100; i++) {

service.submit(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "拿到了许可证");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放了许可证");
semaphore.release();
}
});
}
service.shutdown();
}
}
注意
  • 获取和释放的许可证数量必须一致,否则比如每次都获取 2 个但是只释放 1 个甚至不释放,随着时间的推移,到最后许可证数量不够用,会导致程序卡死,虽然信号量类并不对释放和获取的数量做规定,但是这是我们的编程规范,否则容易出错
  • 注意在初始化 Semaphore 的时候设置公平性,一般设置为 true 会更合理
  • 并不是必须由获取许可证的线程释放那个许可证,事实上,获取和释放许可证对线程并无要求,也许是 A 获取了,然后由 B 释放,只要逻辑合理即可
  • 信号量的作用,除了控制临界区最多同时有 N 个线程访问外,另一个作用是可以实现“条件等待”,例如线程 1 需要在线程 2 完成准备工作后才能开始工作,那么就线程 1 获取许可证,而线程 2 完成任务后释放许可证,这样的话,相当于是轻量级的 CountDownLatch

Condition

如果说 Lock 用来代替 synchronized,那么 Condition 就是用来代替相对应的 Object.wait/notify 的,所以在用法和性质上几乎都一样
await 方法会自动释放持有的 Lock 锁,和 Object 的 wait 一样,不需要自己手动释放锁
另外,调用 await 的时候,必须持有锁,否则会抛出异常,和 Object.wait 一样

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 描述:用Condition实现生产者消费者模式
*/
public class ConditionDemo2 {

private final int queueSize = 10;

private final PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);

private final Lock lock = new ReentrantLock();

private final Condition notFull = lock.newCondition();

private final Condition notEmpty = lock.newCondition();

public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}

class Consumer extends Thread {

@Override
public void run() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
} finally {
lock.unlock();
}
}
}
}

class Producer extends Thread {

@Override
public void run() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满,等待有空余");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}
}