本文共 5296 字,大约阅读时间需要 17 分钟。
CyclicBarrier:循环栅栏。这是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时的CyclicBarrier很有用。因为该Barrier在释放等待线程后可重用,所以称为循环的barrier。
我们已经看到通过闭锁来启动一组相关的操作,或等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。
栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于:所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人6:00在麦当劳碰头,到了之后要等其他人,之后讨论下一步要做的事情。”
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并将抛出BrokenBarrierException。
通俗的例子就是一帮人(子线程)去旅游,我们规定了几个时间点(栅栏),中途大家可以随便按路线参观,但是必须9点到达,开始参观,必须12点集合吃饭。如,张三,李四,王五,赵六,分别8:10,8:15,8:20,8:25到达,此时barrier关闭,虽然大家到了也不能开始,必须景点开门,也就是到达了9:00(可能比喻不恰当,实际上线程都到之后就可以了),栅栏开启,大家去参观天安门,参观之后大家到达全聚德,比如张三参观的快,10点就看完了,虽然去了全聚德,依然不能吃饭,因为此时barrier又关闭了,必须所有人同时到达,栅栏打开,大家开始吃,至于谁吃的快,谁吃的慢这就无所谓了。
package com.mylearn.thread; import java.util.concurrent.*; /** * Created by IntelliJ IDEA. * User: yingkuohao * Date: 13-7-15 * Time: 下午3:54 * CopyRight:360buy * Descrption: * CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(common barrier point)。 * 在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该barrier在 * 释放等待线程后可以重用,所以称它为循环的barrier。 * <p/> * CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前), * 该命令可在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。 * To change this template use File | Settings | File Templates. */ public class CyclicBarrierTest { public static void main(String args[]) { ExecutorService executorService = Executors.newCachedThreadPool(); CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { public void run() { System.out.println("饭后来一颗益达"); //后续线程执行 } }); Future future = executorService.submit(new Task(cyclicBarrier)); executorService.submit(new Task(cyclicBarrier)); executorService.shutdown(); System.out.println("mainfangfa "); } private static class Task implements Runnable { private CyclicBarrier cyclicBarrier; private Task(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } public CyclicBarrier getCyclicBarrier() { return cyclicBarrier; } public void setCyclicBarrier(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } public void run() { System.out.println(Thread.currentThread().getName() + "吃早饭"); //执行子任务,然后wait try { cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "吃午饭"); //执行子任务,然后wait cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "吃晚饭"); //执行子任务,然后wait cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } } |
构造方法:
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } |
Wait:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } |
Dowait:
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); private static class Generation { boolean broken = false; } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); //加锁 try { final Generation g = generation; //如果如果对await调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并将抛出BrokenBarrierException。 if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //count-- if (index == 0) { // tripped boolean ranAction = false; try { //如果index=0,说明所有线程到达,栅栏打开,执行后续的线程 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); //打开栅栏,重新初始化进行下一次循环 return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //传入的timed是false trip.await(); //线程挂起,等待栅栏打开时唤醒 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); //如有超时,等待超时时间 } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) //如果被打断,抛异常 throw new BrokenBarrierException(); if (g != generation) //如果成功通过栅栏,将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中又该线程执行一些特殊的工作。 return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } |
nextGeneration:
private void nextGeneration() { // signal completion of last generation,//唤醒所有阻塞的线程 trip.signalAll(); // set up next generation,重设初始值,准备下一次复用 count = parties; generation = new Generation(); } |
转载地址:http://nwrrb.baihongyu.com/