Nivelle 开拓视野冲破艰险看见世界 身临其境贴近彼此感受生活

多线程学习(二)之同步容器

2017-05-27
nivelle

对象的组合

设计线程安全的类

设计线程安全类过程中,需要包含下面三个基本要素

  • 找出构成对象状态的所有变量
  • 找出约束状态变量的不变性条件
  • 建立对象状态的并发访问管理策略

如果在对象的域中引用了其他对象,那么该对象的状态将包含被引用对象的域。

如果在一个不变性条件中包含多个变量,那么在执行任何访问相关变量的操作时,都必须持有保护这些变量的锁。

实例封闭

封装简化了线程安全类的实现过程,它提供了一种实例封闭机制,通常也简称为“封闭”。当一个对象被封装到另一个对象中时,能够访问被封装对象的所有代码路径都是已知的。

将数据封装在对象内部,可以将数据的访问现在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁。


基础构建模块

同步容器类
  • 包括Vector和Hashtable,这些同步容器类是由Collections.synchronizedXxx等工厂方法创建的。这些类实现线程安全的方式是:将他们的状态封装起来,并对每个共有方法进行同步,使得每次只有一个线程能访问容器的状态。
  • 同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。复合操作包括:迭代、跳转、条件运算
  • 由于同步容器类要遵守同步策略,即支持客户端加锁,因此可能会创建一些新的操作,只要我们知道应该使用哪一个锁,那么这些操作就与容器的其他操作一样都是原子操作。同步容器类通过其自身的锁来保护它的每个方法。通过获得容器类的锁,我们可以使getLast和deleteLast成为原子操作.如下:
public static Object getLast(Vector list){
    synchronized(list){
        int lastindex = list.size()-1;
        return list.get(lastindex);
    }
}

public static void deletelast(Vector list){
    synchronized(list){
        int lastIndex = list.size() - 1;
        list.remove(lastIndex);
    }
}
隐藏迭代器

编译器将字符串的链接操作转换为调用StringBuilder.append(Object),而这个方法又会调用容器的toString方法将迭代容器,并在每个元素上调用toString来生成容器内容的格式化表示。

public class HiddenIterator{
    @GuardedBy("this")
    private final Set<Integer>set = new HashSet<Integer>();
    
    public synchronized void add(Integer i){set.add(i);}
    
    public synchronized vod remove(Integer i){set.remove(i);}
    
    public void addTenThings(){
        Random r = new Random();
        for(int i=0;i<10;i++)
           add(r.nextInt());
        System.out.println("DEBUG:added ten elementsto"+set);
    }
}

容器的hashCode和equals等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或者键值时,就会出现这种情况。同样,containsAll、removeALL和retainALL等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都可能抛出ConcurrentModificationException。

并发容器

同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重减低。

Java 5.0中增加了ConcurrentHashMap,用来代替同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。

  • ConcurrentHashMap: 同步容器类在执行每个操作期间都持有一个锁。并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。这种机制中,任意数量的读取线程可以并发地访问Map,执行读取操作的线程和执行写入操作的线程可以并发地访问Map,并且一定数量的写入线程可以并发地修改Map。这样在并发访问环境下实现更高吞吐量,而在单线程环境中只损失非常小的性能。

在ConcurrentHashMap中没有实现对Map加锁以提供单独访问。在Hashtable和synchronized-Map,获取Map的锁能防止其他线程访问这个Map.

  • CopyOnWriteArrayList: 用于代替同步List,提供更好的并发性能,并且在迭代期间不需要对容器进行加锁或者复制。“写入时复制(Copu-on-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变得对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。它的迭代保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于不会被修改,因此在对其进行同步时只需要确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此感染或者与修改容器的线程互相干扰。每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。

  • 阻塞队列和生产者-消费者模式:阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。take操作会一直阻塞直到有可用的数据,put操作时,若使用有界队列,当队列充满时,生产者将阻塞并且不能继续生成工作。 阻塞队列同样提供了一个offer方法,如果数据项不能被添加到队列,那么将返回一个失败状态。这样就能够创建更多灵活的策略来处理负荷过载的情况,例如减轻负载,将多余的工作序列化并写入磁盘,减少生产者线程的数量。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止生产过多的工作项,使用应用程序在符合过载的情况下边的更加健壮。

BlockingQueue的多种实现: LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,分别类似LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当希望按照某种顺序排序时使用这个非常有用,而且它既可以根据元素的自然顺序来比较元素(实现了Comparable方法)也可以使用Comparator来比较。

  • 串行线程封闭

对于可变对象,生成者-消费者与阻塞队列一起,促进额串行线程封闭,从而将对象所有权交给消费者。线程封闭对象只能有单个线程拥有,但可以通过安全地发布该对象来转移所有权。在转移所有权之后,也只有另一个线程能获得这个对象的访问权限,并且发布对象的线程不会再访问他。

  • 双端队列与工作密取

java6新增Deque和BlockingDeque,分别对应Queue和BlockIngQueue进行拓展,但它是一个双端队列,实现了队列头部和队列尾部的高效插入和移除。具体实现包括ArrayDeque和LinkedBlcokDeque。双端队列对应的工作模式是:密取。在密取工作模式中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么他可以从其他消费双端队列末尾秘密地获取工作。

双端队列的优势:


1. 比传统的生产者-消费者模式具有更高的课伸缩性,因为工作线程不会在单个共享的任务队列上放生竞争。

2. 大多数时候,它们只是访问自己的双端队列,从而极大地减少了竞争

3. 当工作线程需要访问另个一个队列时,它会从队列的尾部而不是从头部获取工作,进一步降低了队列上的竞争程度。

工作密取非常适用于既是消费者也是生产者问题————当执行某个工作时可能导致出现更多的工作。

阻塞方法与中断方法

当线程阻塞时,它通常会被挂起,并处于某种阻塞状态(BLOCKED,WAITING,TIMED_WAINTING);当某个外部事件发生时,线程被置回RUNNABLE状态,才可以再次被调度执行。

中断的响应。 BlockingQueue的put和take方法会抛出受检异常(checked Exception)InterruptedException,这与类库中其他一些方法的做法相同,例如Thread.sleep。当某方法抛出InterruptedException时,表示该方法是一个阻塞方法,如果这个方法被中断,那么他将努力提前结束阻塞状态。

Thread 提供了Interrupt方法,用 于中断线程或者查询线程是否已经被中断。每个线程都有个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。

中断时一种机制。一个线程不能强制其他线程停止正在执行的操作而去其他的操作。当线程A中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作————前提是如果线程B愿意停止下来。

当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须处理对中断的响应。有两种处理方式:

  • 传递InterruptedException。传递它的方法包括根本不捕获该异常或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常

  • 恢复中断 有时不能抛出InterruptedException,例如代码是Runnable的一部分时,这种情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。

public class TaskRunable impleents Runnable
{
BlockingQueue<Task> queue;

...
public void run()
  { 
   try{
       processTask(queue.take());
        }catch(InterruptedException e){
           Thread.currentThread().interrupt();
       }
  }
}

同步工具类
  • 在容器类中,阻塞队列时一种独特的类:它们不仅能作为保存对象的容器,还能协调生产者和消费者等线程之间的控制流,因为take和put等方法将阻塞,直到队列达到预期的状态。因此可以作为一种同步工具类

  • 闭锁:闭锁就像一扇门在闭锁到达结束状态之前,在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有线程通过。当闭锁到达结束状态后,将不会再改变状态,这扇门将会永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成成后才继续执行

CountDownLatch是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示已经有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待线程中断或者等待超时。

import java.util.concurrent.CountDownLatch;

public class TestHarness {
  public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
    final CountDownLatch startGate = new CountDownLatch(1);
    final CountDownLatch endGate = new CountDownLatch(nThreads);

    for (int i = 0; i < nThreads; i++) {
      Thread t = new Thread() {
        public void run() {
          try {
            startGate.await();
            try {
              task.run();
            } finally {
              endGate.countDown();
            }
          } catch (InterruptedException ignored) {
          }
        }
      };
      t.start();
    }

    long start = System.nanoTime();
    startGate.countDown();
    endGate.await();
    long end = System.nanoTime();
    return end - start;
  }
}

  • FutureTask: FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask 表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于一下3种状态:等待运行(waiting to run),正在运行(Running)和运行完成(Completed)。当FutureTask进入完成状态后,它会永远停止在这个状态上。

Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

  • 信号量:计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池或者对容器施加边界。

Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可以通过构造函数来指定。在执行操作时可以首先获得许可,并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可。release方法将返回一个许可给信号量。

信号量的使用,将容器设置为有界阻塞容器,只允许添加信号量大小的数据到集合当中:

public class BoundedHashSet {
  private final Set<String>set;
  
  private final Semaphore sem;
  
  public BoundedHashSet(int bound){
    this.set = Collections.synchronizedSet(new HashSet<String>());
    sem = new Semaphore(bound);
  }
  
  public boolean add(String o) throws InterruptedException{
    sem.acquire();
    boolean wasAdded = false;
    try{
      wasAdded=set.add(o);
      return wasAdded;
    }finally{
      if(!wasAdded){
        sem.release();
      }
    }       
  }
  public boolean remove(Object o){
    boolean wasRemoved = set.remove(o);
    if(wasRemoved)
      sem.release();    
    return wasRemoved;
  }
}

  • 栅栏:类似于闭锁,他能阻塞一组线程直到某个事件发生。它与闭锁的主要区别是所有线程必须同时达到栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在冰箱迭代算法中非常有用:这种算法通常将一个问题拆分成一些列互相独立的子问题。当线程达到栅栏位置时调用await方法,阻塞到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏打开,此时所有线程释放,而栅栏将被重置以便下次使用。

若对await调用超时,或者await阻塞的线程被打断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException.


内容来自《java并发编程实战》 作者:Brian Goetz  Tim Peierls


评论