java多线程之并发工具类CountDownLatch,CyclicBarrier和Semaphore 目录 CountDownLatch Semaphore CyclicBarrier 总结 CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作. 假设一个Excel文件有多个sheet,我们需要去记录每个sheet有多少行数据, 这时我们就可以
目录
- CountDownLatch
- Semaphore
- CyclicBarrier
- 总结
CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。
假设一个Excel文件有多个sheet,我们需要去记录每个sheet有多少行数据,
这时我们就可以使用CountDownLatch实现主线程等待所有sheet线程完成sheet的解析操作后,再继续执行自己的任务。
执行结果:
WorkThread启动了,时间为1640054503027
WorkThread我要统计每个sheet的行数
sheetThread1启动了,时间为1640054503028
sheetThread2启动了,时间为1640054503029
sheetThread2执行完了,时间为1640054504031 sheet的行数为:6
sheetThread1执行完了,时间为1640054504031 sheet的行数为:44
WorkThread执行完了,时间为1640054505036
可以看到,首先WorkThread执行await后开始等待,WorkThread在等待sheetThread1和sheetThread2都执行完自己的任务后,WorkThread立刻继续执行后面的代码。
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。
由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。
用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。
我们继续根据上面的测试案例流程,一步一步的分析CountDownLatch 源码。
第一步看CountDownLatch的构造方法,传入一个不能小于0的int类型的参数作为计数器
看它的注释,说的非常清楚,Sync就是CountDownLatch的同步控制器了,而它也是继承了AQS,并且第3行注释说到使用了AQS的state去代表count值。
第二步就是工作线程调用await()方法
如果线程中断,抛出异常,否则开始调用tryAcquireShared(1),其内部类Sync的实现也非常简单,就是判断state也就是CountDownLatch的计数是否等于0,
如果等于0,则该方法返回1,第5行的if判断不成立,否则该方法返回-1,第5行的if判断成立,继续执行doAcquireSharedInterruptibly(1)。
这个方法其实就是去获取共享模式下的锁,获取失败就park住。正如我们测试案例中的WorkThread线程应该次数就被park住了,那么它又是何时被唤醒的呢?
下面就到countDown()方法了
tryReleaseShared(1)方法尝试去释放共享锁
在for循环中,先获取CountDownLatch的计数也就是当前state,如果等于0返回false,否则将state更新为state-1,并返回最新的state是否等于0。
因此在我们的测试案例中,我们需要调用两次countDown方法,才会将全局的state更新为0,然后继续执行doReleaseShared()方法。
LockSupport.unpark(s.thread),唤醒线程的方法被调用后,WorkThread线程就可以继续执行了。
至此我们简单分析了整个测试案例中CountDownLatch的代码流程。
Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,相当于一个并发控制器,构造的时候传入可供管理的信号量的数值,这个数值就是用来控制并发数量的,
每个线程执行前先通过acquire方法获取信号,执行后通过release归还信号 。每次acquire返回成功后,Semaphore可用的信号量就会减少一个,如果没有可用的信号,
acquire调用就会阻塞,等待有release调用释放信号后,acquire才会得到信号并返回。
下面我们看个测试案例
执行结果:
Thread-0获得了信号量>>>>>,时间为1640058647604
Thread-1获得了信号量>>>>>,时间为1640058647604
Thread-2获得了信号量>>>>>,时间为1640058647604
Thread-3获得了信号量>>>>>,时间为1640058647605
Thread-4获得了信号量>>>>>,时间为1640058647605
Thread-0释放了信号量<<<<<,时间为1640058648606
Thread-1释放了信号量<<<<<,时间为1640058648606
Thread-5获得了信号量>>>>>,时间为1640058648607
Thread-4释放了信号量<<<<<,时间为1640058648607
Thread-3释放了信号量<<<<<,时间为1640058648607
Thread-7获得了信号量>>>>>,时间为1640058648607
Thread-8获得了信号量>>>>>,时间为1640058648607
Thread-2释放了信号量<<<<<,时间为1640058648606
Thread-6获得了信号量>>>>>,时间为1640058648607
Thread-9获得了信号量>>>>>,时间为1640058648607
Thread-7释放了信号量<<<<<,时间为1640058649607
Thread-6释放了信号量<<<<<,时间为1640058649607
Thread-8释放了信号量<<<<<,时间为1640058649607
Thread-9释放了信号量<<<<<,时间为1640058649608
Thread-5释放了信号量<<<<<,时间为1640058649607
我们使用for循环同时创建10个线程,首先是线程 0 1 2 3 4获得了信号量,再后面的10行打印结果中,线程1到5分别释放信号量,相同线程间隔也是1000毫秒,然后线程5 6 7 8 9才能继续获得信号量,而且保持最大获取信号量的线程数小于等于5。
看下Semaphore的构造方法
它支持传入一个int类型的permits,一个布尔类型的fair,因此Semaphore也有公平模式与非公平模式。
第9行代码可见Semaphore也是通过AQS的state来作为信号量的计数的
第12行 getPermits() 方法获取当前的可用的信号量,即还有多少线程可以同时获得信号量
第15行nonfairTryAcquireShared方法尝试获取共享锁,逻辑就是直接将可用信号量减去该方法请求获取的数量,更新state并返回该值。
第24行tryReleaseShared 方法尝试释放共享锁,逻辑就是直接将可用信号量加上该方法请求释放的数量,更新state并返回。
再看下Semaphore的公平锁
看尝试获取共享锁的方法中,多了个 if (hasQueuedPredecessors) 的判断,在java多线程6:ReentrantLock,
分析过hasQueuedPredecessors其实就是判断当前等待队列中是否存在等待线程,并判断第一个等待的线程(head.next)是否是当前线程。
CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
一组线程同时被唤醒,让我们想到了ReentrantLock的Condition,它的signalAll方法可以唤醒await在同一个condition的所有线程。
下面我们还是从一个简单的测试案例先了解下CyclicBarrier的用法
执行结果:
Thread-1开始, 时间为1640069673534
Thread-0开始, 时间为1640069673534
CyclicBarrier的barrierAction开始运行, 时间为1640069679536
Thread-1结束, 时间为1640069679536
Thread-0结束, 时间为1640069679536
可以看到Thread-0和Thread-1同时运行,而自定义的线程barrierAction是在6000毫秒后开始执行,说明Thread-0在await之后,等待了3000毫秒,和Thread-1一起继续执行的。
看下CyclicBarrier 的一个更高级的构造函数
parties就是设定需要多少线程在屏障前等待,只有调用await方法的线程数达到才能唤醒所有的线程,还有注意因为使用CyclicBarrier的线程都会阻塞在await方法上,所以在线程池中使用CyclicBarrier时要特别小心,如果线程池的线程过少,那么就会发生死锁。
Runnable barrierAction用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
首先是ReentrantLock加锁,全局的count值-1,然后判断count是否等于0,如果不等于0,则循环,condition执行await等待,直到触发、中断、中断或超时,如果count值等于0,先执行barrierAction线程,然后condition开始唤醒所有等待的线程。
简单是使用之后,有人会觉得CyclicBarrier
和CountDownLatch
有点像,其实它们两者有些细微的差别:
1:CountDownLatch
是在多个线程都进行了latch.countDown()
后才会触发事件,唤醒await()在latch上的线程,而执行countDown()的线程,是不会阻塞的;
CyclicBarrier
是一个栅栏,用于同步所有调用await()方法的线程,线程执行了await()方法之后并不会执行之后的代码,而只有当执行await()方法的线程数等于指定的parties之后,这些执行了await()方法的线程才会同时运行。
2:CountDownLatch
不能循环使用,计数器减为0就减为0了,不能被重置;CyclicBarrier本是就是支持循环使用parties,而且提供了reset()方法,可以重置计数器。
总结
本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!