Semaphore信号量也是Java中的一个同步器,与CountDownLatch和CycleBarrier不同的是,它内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。
同样下面的例子也是在主线程中开启两个子线程让它们执行,等所有子线程执行完毕后主线程再继续向下运行。
输出结果如下。
如上代码首先创建了一个信号量实例,构造函数的入参为0,说明当前信号量计数器的值为0。
然后main函数向线程池添加两个线程任务,在每个线程内部调用信号量的release方法,这相当于让计数器值递增1。
最后在main线程里面调用信号量的acquire方法,传参为2说明调用acquire方法的线程会一直阻塞,直到信号量的计数变为2才会返回。
看到这里也就明白了,如果构造Semaphore时传递的参数为N,并在M个线程中调用了该信号量的release方法,那么在调用acquire使M个线程同步时传递的参数应该是M+N。
下面举个例子来模拟CyclicBarrier复用的功能,代码如下。
输出结果为
如上代码首先将线程A和线程B加入到线程池。
主线程执行代码(1)后被阻塞。
线程A和线程B调用release方法后信号量的值变为了2,这时候主线程的aquire方法会在获取到2个信号量后返回(返回后当前信号量值为0)。
然后主线程添加线程C和线程D到线程池,之后主线程执行代码(2)后被阻塞(因为主线程要获取2个信号量,而当前信号量个数为0)。
当线程C和线程D执行完release方法后,主线程才返回。
从本例子可以看出,Semaphore在某种程度上实现了CyclicBarrier的复用功能。
为了能够一览Semaphore的内部结构,首先看下Semaphore的类图,如图所示。
由该类图可知,Semaphore还是使用AQS实现的。
Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。
例如,下面的代码在创建Semaphore时会使用一个变量指定是否使用公平策略。
在如上代码中,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。
另外,如CountDownLatch构造函数传递的初始化信号量个数permits被赋给了AQS的state状态变量一样,这里AQS的state值也表示当前持有的信号量个数。
下面来看Semaphore实现的主要方法。
当前线程调用该方法的目的是希望获取一个信号量资源。
如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。
否则如果当前信号量个数等于0,则当前线程会被放入AQS的阻塞队列。
当其他线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。下面看下代码实现。
由如上代码可知,acquire()在内部调用了Sync的acquireSharedInterruptibly方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。
尝试获取信号量资源的AQS的方法tryAcquireShared是由Sync的子类实现的,所以这里分别从两方面来讨论。
先讨论非公平策略NonfairSync类的tryAcquireShared方法,代码如下。
如上代码先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining),如果剩余值小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。
如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于NonFairSync是非公平获取的,也就是说先调用aquire方法获取信号量的线程不一定比后来者先获取到信号量。
考虑下面场景,如果线程A先调用了aquire()方法获取信号量,但是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列。
过一段时间后线程C调用了release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程A就会被激活,然后获取该信号量,但是假如线程C释放信号量后,线程C调用了aquire方法,那么线程C就会和线程A去竞争这个信号量资源。
如果采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A被激活前,或者激活后先于线程A获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。
下面看公平性的FairSync类是如何保证公平性的。
该方法与acquire方法不同,后者只需要获取一个信号量值,而前者则获取permits个。
该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly获取资源时(包含被阻塞后),其他线程调用了当前线程的interrupt()方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException异常而返回。
该方法与acquire(int permits)方法的不同之处在于,该方法对中断不响应。
该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用aquire方法被阻塞而被放入了AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量,下面看代码实现。
该方法与不带参数的release方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加permits,而后者每次增加1。