摘录自 极客时间

信号量模型

模型图如下所示:
p1

  • init():设置计数器的初始值
  • down():计数器的值-1,如果此时计数器的值小于0(没资源了),则当前线程被阻塞,否则继续进行
  • up():计数器的值+1,如果此时计数器的值小于或者等于0(大于0说明没有被阻塞的线程,因为资源够用),那么唤醒等待队列中的一个线程,并且将它从等待队列中移除。

这三个方法都是原子性的。
下面代码为一个简单的信号量模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
// 将当前线程插入等待队列
// 阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
// 移除等待队列中的某个线程 T
// 唤醒线程 T
}
}
}

在java SDK中down()up()对应acquire()release()

使用信号量实现简单的互斥:

1
2
3
4
5
6
7
8
9
10
11
12
13
static int count;
// 初始化信号量
static final Semaphore s
= new Semaphore(1);
// 用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}

假设两个线程 T1 和 T2 同时访问 addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0,另外一个线程(T2)则是将计数器减为 -1。对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以线程 T1 会继续执行;对于线程 T2,信号量里面的计数器的值是 -1,小于 0,按照信号量模型里对 down() 操作的描述,线程 T2 将被阻塞。所以此时只有线程 T1 会进入临界区执行count += 1
当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后的值是 0,小于等于 0,按照信号量模型里对 up() 操作的描述,此时等待队列中的 T2 将会被唤醒。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

实现一个限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});

List保存了对象实例,用Semaphore实现了限流器。关键代码是ObjPool中的exec()方法,这个方法实现了限流的功能。在这个方法里面,首先调用 acquire() 方法,假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

PS:和管程相比,信号量可以实现的独特功能就是同时允许多个线程进入临界区,但是信号量不能做的就是同时唤醒多个线程去争抢锁,只能唤醒一个阻塞中的线程,而且信号量模型是没有Condition的概念的,即阻塞线程被醒了直接就运行了而不会去检查此时临界条件是否已经不满足了,基于此考虑信号量模型才会设计出只能让一个线程被唤醒,否则就会出现因为缺少Condition检查而带来的线程安全问题。正因为缺失了Condition,所以用信号量来实现阻塞队列就很麻烦,因为要自己实现类似Condition的逻辑。