Nivelle 开拓视野冲破艰险看见世界 身临其境贴近彼此感受生活

多线程学习(三)之Executor

2017-06-03
nivelle

任务执行

image

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制

Executor框架:

线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在java类库中,任务执行的主要抽象不熟Thread,而是Executor。Executor接口如下:

public interface executor{
    void execute(Runnable command);
}

它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。此外它还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

Executor基于生产者——消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者。

示例:基于Executor的Web服务器

public class TaskExecutionWebServer {
    
  private static final int NTHREADS =100;
  private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
  
  public static void main(String args[]) throws IOException{
    ServerSocket socket = new ServerSocket(80);
    while(true){
      final Socket connection = socket.accept();
      Runnable task = new  Runnable() {
        public void run() {
          handleRequest(connection);
        }
      };
      exec.execute(task);
    }
  }
}

线程池

定义:指的是管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

类库中提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPool。创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。

  • newCachedThreadPool。创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。

  • newSingleThreadExecutor。是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另个一线程来代替。它能确保依照任务在队列中的顺序串行执行。

  • newScheduledThreadPool。创建一个固定长度的线程池,而且可以延迟或定时的方式来执行任务,类似于Timer.

线程池的大小

如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐量。

对于计算密集型的任务,在拥有N(cpu)个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的利用率。对于包含IO操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。要正确地设置线程池的大小,你必须估算出任务的等待事件与计算时间的比值。

配置ThreadPoolExecutor

它为一些Executor提供了基本的实现,这些Executor是由Executors中的newCachedThreadPool,newFixedThreadPool和newScheduledThreadExecutor等工厂方法返回的。ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制。如果默认的执行策略不能满足需求,则可以根据情况自己定制。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler){}

线程的创建与销毁

基本大小:Core Pool Size 也就是目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程

最大大小:Maximum Pool Size 最大大小,表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小是,这个线程将被终止

通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。

newFixedThreadPool 工厂方法将线程的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool工厂方法将线程池最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限拓展,并且当需求降低是会自动收缩。其他形式的线程池可以通过显示的ThreadPoolExecutor。

管理队列任务

在有限的线程池中会限制可并发执行的任务数量、(单线程的Executor是一种值得注意的特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性)

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3类:无界队列,有界队列和同步移交。队列的选择和其他配置参数有关。

  • newFixedThreadPool和newSingleThradExecutor在默认的情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。

一种更加稳妥的方式是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlickingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生。

对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchrousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。将一个元素放入SynchrounousQueue中,必须有另一个线程在等待接受这个元素。如果没有线程等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutr将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。

只有当线程池是无界或者可以拒绝任务时,SynchronousQueue才有实际价值,在NewCachedThreadPool工厂方法中就使用了SynchronousQueue。

Executor的生命周期

Executor的实现通常会创建线程来执行任务。但JVM只有在所有线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。

Executor以异步方式执行任务,因此在任何时刻,之前提交任务状态不是立即可见的。关闭应用程序时,可能采用平缓的关闭形式(完成所有已经启动的任务,并且不再接受新的任务。)

为了解决执行服务的生命周期问题,Executor拓展了ExecutorService接口,添加了一些用于生命周期管理的方法。如下:

public interfact Executorservice extends  Executor{
  void shutdown();
  List<Runable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout,timeUnit unit) throws InterruptedExeception;
}

ExecutorService 的生命周期有3种状态:运行、关闭和终止。在创建初期处于运行状态。shutdown方法将执行平缓的关闭的过程:不再接受新的任务,同时等待已经提交的任务执行完成————包括还未开始执行的任务。shutsownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

延迟任务和周期任务

Timer类负责管理延迟任务,以及周期任务。然而,Timer存在一些缺陷,用ScheduleThreadPool来代替它。可用通过构造函数或者newScheduleThreadPool工厂方法来创建该类的对象,

Timer的劣势:

  • Timer在执行所有定时任务时只会创建一个线程。如果某个任务执行时间太长,那么将破坏其他TimerTask的定时精确性。

  • Timer线程并不捕获异常,因此当TimerTask抛出未检查的异常时将终止定时线程。这种情况下Timer也不会恢复线程的执行,而是错误的认为整个Timer都被取消了。因此已经被调度但尚未执行的TimerTask将不再执行,新的任务也不能被调度。

携带结果的任务Callable与Future

Callable:它认为主入口点(call)将返回一个值,并可能抛出一个异常。在Executor中包含了一些辅助方法能将其他类型的任务封装为一个Callable,例如:Runnable和java.security.PrivilegedAction。

Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及任务获取任务的结果和取消任务等。get方法的行为取决于任务的状态(尚未开始、正在运行、已经完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exeception,如果任务没有完成,那么get将阻塞并直到任务完成。如果抛出异常,那么get将该异常封装成为ExecutionException并重新抛出。如果任务呗取消,那么get将抛出CancleationExeception。如果get抛出了ExecutionExecuption异常,那么可以通过getCause来获得被封装的初始异常。

public interface Callable<V>{
  V call() throws Exeception;
}


public interface Future<V>{
  boolean cancle(boolean mayInterruptIfRunning);
  boolean isCancelled();
  boolean isDone();
  V get() throws InterruptedExeception,ExecutionExeception,CancellationException;
  V get(long timeout,TimeUnit unit) throws InterruptedExeception,ExecutionExeception,CancellationException;
}

可以通过许多种方法创建一个Future来描述任务。ExecutorService中的submit方法都将返回一个Future,从而将一个Runnable或者Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务,还可以显示地为某个指定的Runnable或Callable实例化一个FutureTask。

protected<T> RunnableFuture<T> newTaskFor(Callable<T> task){
  return new FutureTask<T>(task);
}


任务的取消和关闭

任务取消

java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

  • 其中一种协作机制设置某个“已请求取消”标志,而任务将定期查看该标志。如果设置了这个标志,那么任务将提前结束。

public class PrimeGenerator implements Runnable {
  private final List<BigInteger> primes = new ArrayList<BigInteger>();
  
  private volatile boolean cancelled;
  
  public void run(){
    BigInteger p =BigInteger.ONE;
    while (!cancelled) {
       p = p.nextProbablePrime();
       synchronized (this) {
         primes.add(p);
      }      
    }
  }
  
  public void cancel(){cancelled =true;}
  
  public synchronized List<BigInteger> get(){
    return new ArrayList<BigInteger>(primes);
  }
  
  //使用示例,素数生成1秒后取消
  
  List<BigInteger> aSecondOfPrimes() throws InterruptedException{
    PrimeGenerator generator = new PrimeGenerator();
    new Thread(generator).start();
    try{
      SECONDS.sleep(1);
    }finally{
      generator.cancel();
    }
    return generator.get();
  }
}


PrimeGenerator 使用了一种简单的取消策略:客户端代码通过调用cancel来请求取消,PrimeGenerator在每次搜索素数前首先检查在取消请求,如果存在则退出。

中断

每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。在Thread 中包含了中断线程以及查询线程中断状态的方法。Interrupt方法能中断目标线程,而isInterrupted方法能返回目标线程的中断状态。静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。


public class Thread{
  
  public void interrupt(){...}

  public boolean isInterrupted(){...}

  public static boolean interrupted(){...}
}


阻塞库方法,例如Thread.sleep和Object.wait 等,都会检查线程何时中断,并且在发现中断时提起返回。它们在响应中断时执行操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提起结束。JVM并不能保证阻塞方法检测到中断的速度,但实际情况中响应速度还是非常快的。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。如果不触发InterruptedException,那么中断状态一直保持,直到明确地清除中断状态。

调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

对中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。wait、sleep、和join等,将严格处理这种请求,当它们收到中断请求或者在开始时发现某个已经被设置好的中断状态时,将抛出一个异设计良好的代码,它们能使用调用代码对中断请求进行某种处理。

在使用静态interrupted时应该小心,因为它会清除当前线程的中断状态,如果在调用Interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理————可以抛出InterruptedException,或者通过再次调用interrupt来恢复中断状态。

通常,中断是实现取消的最合理方式。

public class PrimeProducer extends Thread{
   
  private final BlockingQueue<BigInteger> queue;
  
  public PrimeProducer(BlockingQueue<BigInteger> queue) {
     this.queue = queue;
  }
 
  public void run(){
    try {
      BigInteger p = BigInteger.ONE;
          while(!Thread.currentThread().isInterrupted()){
             queue.put(p = p.nextProbablePrime());
          }
    } catch (InterruptedException consumed) {
       //允许线程退出
    }
  }
  
  public void cancel(){interrupt();}
}


中断策略

中断策略规定线程如何解释某个中断请求————当发现中断请求时,应该做哪些工作,哪些工作单元对于中断来说是原子操作,以及以多块的速度来响应中断。

最合理的中断策略是某种形式的线程级取消操作或者服务级取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。

任务不会再其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者代码来说,应该小心保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。

当检测到中断请求时,任务并不需要放弃所有的操作————它可以推迟处理中断请求,并直到某个更合适的时刻。

响应中断

当调用可可中断的阻塞函数时,例如Thread.sleep或Blocking.put等,有两种实用策略可以处理InterruptedException:

  • 传递异常:可能在执行某个特定任务的清除操作之后,从而使你的方法也可以成为可中断的阻塞方法。

  • 恢复中断状态:从而使调用栈中的上层代码能够对其进行处理。

如果不想或者无法传递InterruptedException,那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用Interrupt来恢复中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

通过Future来实现取消

ExecutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并被处理)如果mayInterruptIfRunning为true并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false,那么意味着“若任务还没有启动,就不要运行它”,这种方式应该用于那些 不处理中断的任务中。

当Future.get 抛出InterruptedException或者TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

内容来自《java并发编程实战》 作者:Brian Goetz  Tim Peierls


评论