大数据 excel xamarin checkbox graph eking文件 抖音 Web Uploader vue下载 react脚手架 less用法 java数据分析 js基本数据类型有哪些 java 大文件上传 vue与html5 mysql删除表 python调用方法 python中re模块 java删除数组中的元素 java手册 java的泛型 java数组追加 java获取当前时间 获取当前时间java php连接mssql 千元以下最好的手机 咪咕客户端下载 html5网页制作 c语言表白代码 backtrack3 linux定时任务 红巨人插件 pmbok第六版 游戏linux正则表达式 apihook jq循环 极限防守图 语音转文字转换器 ps光照效果 ps取色
当前位置: 首页 > 学习教程  > 编程语言

《Java多线程编程实战指南(核心篇)》笔记(四)线程的活性故障

2020/8/31 15:35:21 文章标签:

文章目录

    • 一、线程的活性故障
      • 1.1 死锁
        • 1.1.1 死锁的产生条件
        • 1.1.2 死锁的规避
        • 1.1.3 死锁的恢复
      • 1.2 锁死
      • 1.3 线程饥饿
      • 1.4 活锁
    • 二、线程管理
      • 2.1 线程组
      • 2.2 可靠性:线程的未捕获异常与监控
      • 2.3 线程工厂
      • 2.4 线程的暂挂和恢复
      • 2.5 线程的高效利用:线程池
        • 2.5.1 任务的处理结果、异常处理与取消
        • 2.5.2 线程池监控
        • 2.5.3 线程池死锁
        • 2.5.4 工作者线程的异常终止

一、线程的活性故障

  线程活性故障是由于资源稀缺性或程序子很的问题和缺陷导致线程一直处于非RUNNABLE状态,或者线程虽然处于RUNNABLE状态,但是其要执行的任务却一直无法进展的故障现象。

1.1 死锁

  死锁是线程的一种常见活性故障。如果两个或更多的线程因互相等待对方而被永远暂停(线程的生命周期状态为BLOCKED或WAITING),那么就称这些线程产生了死锁
  由于产生死锁的线程的生命周期状态永远是非运行状态,因此这些线程要执行的任务也永远无法进展。死锁产生的一种典型情形是线程A在持有锁L1的情况下申请锁L2,而线程B在持有L2的情况下申请L1,A只有在获得并释放L2后才会释放L1,而B在只有获得并释放L1后才会释放L2。此时,A和B释放锁的前提都是先获得对方持有的另一个锁,因此这两个线程最终都无法获得它们申请的另一个锁,两个线程都处于无限等待的状态,即产生了死锁。

1.1.1 死锁的产生条件

  当线程产生死锁时,这些线程及相关的资源将满足如下全部条件:

  • 1、资源互斥
      涉及的资源必须是独占的,即每个资源一次只能被一个线程使用。
  • 2、资源不可抢夺
      涉及的资源只能被其持有者(线程)主动释放,而无法被资源的持有者和申请者之外的第三方线程抢夺。
  • 3、占用并等待资源
      涉及的线程当前至少持有一个资源(资源A)并申请其他资源(资源B),而这些资源(资源B)恰好被其他线程持有。在这个资源等待的过程中,线程并不释放其已经持有的资源。
  • 4、循环等待资源
      涉及的线程必须在等待别的线程持有的资源,而这些线程又反过来在等待第1个线程所持有的资源。
      这些条件是死锁产生的必要条件而非充分条件,也就是说只要产生了死锁,那么上面的这些条件一定同时成立,但是上述条件即便同时成立也不一定产生死锁。
      如果把锁看作一种资源,这种资源正好符合“资源互斥”和“资源不可抢夺”的要求。那么,可能产生死锁的特征就是在持有一个锁的情况下去申请另外一个锁,通常是锁的嵌套,比如以下代码:

      如果一个线程在已经持有一个锁的情况下再次申请这个锁(如果,一个类中的一个同步方法调用该类的另外一个同步方法)并不会导致死锁,因为锁是可重入的,该情况下再次申请这个锁是可以成功的。

1.1.2 死锁的规避

  由上文可知,要产生死锁需要同时满足四个条件,所以,只要打破其中一个条件就可以避免死锁的产生。常用的规避方法有如下几种:

  • 1、粗锁法
      改用一个粒度较粗的锁替代原来的多个粒度较细的锁,这样涉及的线程都只需要申请一个锁从而避免了死锁。粗锁法的缺点是它明显降低了并发性并可能导致资源浪费。
  • 2、锁排序法
      相关线程使用全局统一的顺序申请锁。假设有多个线程需要申请锁(资源),那么只需要让这些线程依照一个全局(相对于使用这种资源的所有线程而言)统一的顺序去申请这些资源,就可以消除“循环等待资源”这个条件,从而规避死锁。一般,可以使用对象的hashcode作为资源的排序依据。
  • 3、使用ReentrantLock.tryLock(long timeout, TimeUnit unit) 申请锁
      ReentrantLock.tryLock(long timeout, TimeUnit unit) 允许为申请锁这个操作加上一个超时时间。在超时事件额你,如果相应的锁申请成功,该方法返回true。如果在tryLock执行的那一刻相应的锁正在被其他线程持有,那么该方法会使当前线程暂停,直到这个锁申请成功(此时该方法返回true)或者等待时间超过指定的超时时间(此时该问题返回false)。因此,使用ReentrantLock.tryLock(long timeout, TimeUnit unit) 来申请锁可以避免一个线程无限制地等待另外一个线程持有的资源,从而最终能够消除死锁产生的必要条件中的“占用并等待资源”。
  • 4、使用开放调用----在调用外部方法时不加锁
      开放调用是一个方法在调用方法(包括其他类的方法以及当前类的可覆盖方法)的时候不持有任何锁。显然,开放调用能够消除死锁产生的必要条件中的“持有并等待资源”。
  • 5、使用锁的替代品
      使用一些锁的替代品(无状态对象、线程特有对象以及volatile关键字等),在条件允许的情况下,使用这些替代品在保障线程安全的前提下不仅能够避免锁的开销,还能够直接避免死锁。

1.1.3 死锁的恢复

  如果代码中使用的是内部锁或使用的是显式锁而锁的申请是通过Lock.lock()调用实现的,那么这些所的使用所导致的死锁恢复是不可恢复的,此时唯一能做的是重启虚拟机。如果代码中使用的是显式锁且锁的申请是通过Lock.lockInterruptibly()调用实现的,那么这些锁的使用所导致的死锁理论上是可恢复的。但是,死锁的恢复实际可操作性并不强----进行恢复的尝试可能是徒劳的(故障线程可能无法响应中断)且有害的(可能导致其他线程活性故障)。

  导致死锁的线程也具有一定的不可控性(比如第三方软件启动的线程),因此死锁恢复的实际可操作性并不强。

  死锁的自动恢复有赖于线程的中断机制,其基本思想是:定义一个工作者线程,专门用于死锁检测和恢复。该线程定期检测系统中是否存在死锁,若检测到死锁,则随机选择一个死锁线程并给其发送中断。该中断使得一个任意的死锁线程(目标线程)被虚拟机唤醒,从而使其抛出InterruptedException异常,这使得目标线程不再等待它本来永远也无法申请到的资源。目标线程捕获到InterruptedException异常后,会将其已经持有的资源主动释放掉。该线程一直持续此过程,直到系统中不再存在死锁。示例代码如下:

public class DeadlockDetector extends Thread {
	static final ThreadMXBean tmb = ManagementFactory.getThreadMXBean();
	/**
	 * 检测周期(单位为毫秒)
	 */
	private final long monitorInterval;

	public DeadlockDetector(long monitorInterval) {
	    super("DeadLockDetector");
	    setDaemon(true);
	    this.monitorInterval = monitorInterval;
	}

	public DeadlockDetector() {
	    this(2000);
	}

	public static ThreadInfo[] findDeadlockedThreads() {
	    long[] ids = tmb.findDeadlockedThreads();
	    return null == tmb.findDeadlockedThreads() ? new ThreadInfo[0] : tmb.getThreadInfo(ids);
	}

	public static Thread findThreadById(long threadId) {
	    for (Thread thread : Thread.getAllStackTraces().keySet()) {
	    	if (thread.getId() == threadId) {
	    		return thread;
	    	}
	    }
	    return null;
	}

	public static boolean interruptThread(long threadID) {
	    Thread thread = findThreadById(threadID);
	    if (null != thread) {
	    	thread.interrupt();
	    	return true;
	    }
	    return false;
	}

	@Override
	public void run() {
	    ThreadInfo[] threadInfoList;
	    ThreadInfo ti;
	    int i = 0;
	    try {
	    	for (;;) {
	    		// 检测系统中是否存在死锁
	    		threadInfoList = DeadlockDetector.findDeadlockedThreads();
	    		if (threadInfoList.length > 0) {
	    			// 选取一个任意的死锁线程
	    			ti = threadInfoList[i++ % threadInfoList.length];
	    			Debug.error("Deadlock detected,trying to recover" + " by interrupting%n thread(%d,%s)%n",
	    					ti.getThreadId(), ti.getThreadName());
	    			// 给选中的死锁线程发送中断
	    			DeadlockDetector.interruptThread(ti.getThreadId());
	    			continue;
	    		} else {
	    			Debug.info("No deadlock found!");
	    			i = 0;
	    		}
	    		Thread.sleep(monitorInterval);
	    	}
	    } catch (InterruptedException e) {
	    }
	}
}

  由上述代码可知,DeadlockDetector的实现是通过java.lang.management.ThreadMXBean.findDeadlockedThreads()调用来实现死锁检测的,该方法会返回一组死锁线程的线程编号。
  首先,死锁的自动恢复有赖于死锁的线程能够响应中断。如果在开发过程中意识到某些代码有可能产生死锁,就应该去规避死锁而不是使其支持死锁的自动恢复。其次,自动恢复尝试有可能导致新的问题。如果出问题的线程对中断的响应方式仅仅是保留中断标记而不释放其已持有的资源,可能会造成相应的线程一直在尝试申请锁而一直无法申请成功,即产生活锁。

1.2 锁死

  等待线程由于唤醒其所需的条件永远无法成立,或者其他线程无法唤醒这个线程而一直处于非运行状态(线程并未终止)导致其任务一直无法进展,就成这个线程被锁死
  按照锁死产生的条件来分,锁死包括信号丢失锁死嵌套监视器锁死

  • 1、信号丢失锁死
      该情况指的是由于没有相应的通知线程来唤醒等待线程而使等待线程一直处于等待状态的一种活性故障。
      信号丢失锁死的两个常见例子:
       1>等待线程在执行Object.wait()/Condition.await()前没有对保护条件进行判断,而此时保护条件实际上可能已经成立,此后可能并无其他线程更新相应保护条件涉及的共享变量使其成立并通知等待线程,这就使得等待线程一直处于等待状态。
       2>CountDownLatch.countDown()调用没有放在finally块中导致CountDownLatch.await()的执行线程一直处于等待状态,从而使其任务一直无法进展。
  • 2、嵌套监视器锁死
      嵌套监视器锁死是嵌套锁导致等待线程永远无法被唤醒的一种活性故障。假如有如下代码:

      等待线程在其执行到monitorY.wait()的时候会被暂停且其所持有的锁monitorY会被释放,但是等待线程所持有的的外层锁monitorX并不会因此而被释放。通知线程在调用monitorY.notifyAll()来唤醒等待线程时需要持有相应的锁monitorY,但是由于monitorY所引导的临界区位于monitorX引导的临界区之内,因此通知线程必须先持有外层锁monitorX。
      通知线程执行通知方法的时候,其所申请的monitorX可能正好被等待线程所持有,因此通知线程无法唤醒等待线程。而等待线程只有被唤醒之后才能释放其持有的外层锁monitorX。于是,通知线程始终无法获得锁monitorX,从而无法通过monitorY.notifyAll()调用来唤醒等待线程,这使得等待线程一直处于非运行状态(此处是BLOCKED状态)。这种由于嵌套锁导致通知线程无法唤醒等待线程的活性故障被称为嵌套监听器锁死

1.3 线程饥饿

  线程饥饿是指一直无法获得其所需的资源而导致任务一直无法进展的一种活性故障。
  线程饥饿的一个典型例子是在争用的情况下使用非公平模式的读写锁。此种情况下,可能会导致某些线程一直无法获取其所需的资源(锁),即导致线程饥饿。
  把锁看作一种资源的话,其实死锁也是一种线程饥饿。死锁的结果是故障线程都无法获得其所需的全部锁中的一个锁,从而使其任务一直无法进展,这相当于线程无法获得其所需的全部资源(锁)而使得其任务一直无法进展,即产生了线程饥饿。由于线程饥饿的产生条件是一个(或多个)线程始终无法获得其所需的资源,显然这个条件的满足并不意味着死锁的必要条件*(而不是充分条件)的满足,因此线程饥饿并不会导致死锁。
  线程饥饿涉及的线程,其生命周期不一定就是WAITING或BLOCKED状态,其状态也可能是RUNNING(说明涉及的线程一直在申请宁其所需的资源),这时饥饿就会演变成活锁。

1.4 活锁

  活锁指线程一直处于运行状态,但是其任务却一直无法进展的一种活性故障。

二、线程管理

  之前的内容,都是介绍如歌利用线程做到想要做的事情,本章节介绍如何“做得更好”。在本章中,会介绍的问题包括:线程在其运行过程中一旦抛出了未捕获异常,如何得知并应对的可靠性问题;如何将线程的创建于配置(比如设置优先级)以一种统一的方式管控起来,使这些线程从“散兵游勇”提升为“正规军”的问题;如何提高线程这种宝贵的资源的利用率问题。

2.1 线程组

  线程组(ThreadGroup类)可以用来表示一组相似(相关)的线程。线程与线程组之间的关系类似于文件与文件夹之间的关系,一个线程组可以包含多个线程及其他线程组。一个线程组包含其他线程组的时候,该线程组称为这些线程组的父线程组。Thread类中有几个构造方法允许开发者在创建线程的时候指定线程所属的线程组。如果创建线程的时候没指定线程组,那么这个线程就属于其父线程(即当前线程)所属的线程组。某个线程所属的线程组可以通过Thread.getThreadGroup()来获取。

  多数情况下,可以忽略线程组这一概念以及线程组的存在。

2.2 可靠性:线程的未捕获异常与监控

  如果线程的run方法抛出未被捕获的异常,那么随着run方法的退出,相应的线程也提前终止。在Thread类内部定义了一个UncaughtExceptionHandler接口,该接口中只有一个方法:

	void uncaughtException(Thread thread, Throwable ex);

  uncaughtException方法的两个参数是异常终止的线程本身以及导致线程提前终止的异常。在该方法中,可以做一些事情,比如将异常终止的信息记录到日志文件中,甚至为异常终止的线程创建并启动一个替代线程。示例代码如下:

public class ThreadMonitorDemo {
	volatile boolean inited = false;
	static int threadIndex = 0;
	final static Logger LOGGER = Logger.getAnonymousLogger();
	final BlockingQueue<String> channel = new ArrayBlockingQueue<String>(100);

	public static void main(String[] args) throws InterruptedException {
	    ThreadMonitorDemo demo = new ThreadMonitorDemo();
	    demo.init();
	    for (int i = 0; i < 100; i++) {
	    	demo.service("test-" + i);
	    }

	    Thread.sleep(2000);
	    System.exit(0);
	}

	public synchronized void init() {
	    if (inited) {
	    	return;
	    }
	    Debug.info("init...");
	    WokrerThread t = new WokrerThread();
	    t.setName("Worker0-" + threadIndex++);
	    // 为线程t关联一个UncaughtExceptionHandler
	    t.setUncaughtExceptionHandler(new ThreadMonitor());
	    t.start();
	    inited = true;
	}

	public void service(String message) throws InterruptedException {
	    channel.put(message);
	}

	private class ThreadMonitor implements Thread.UncaughtExceptionHandler {
	    @Override
	    public void uncaughtException(Thread t, Throwable e) {
	    	Debug.info("Current thread is `t`:%s, it is still alive:%s",Thread.currentThread() == t, t.isAlive());

	    	// 将线程异常终止的相关信息记录到日志中
	    	String threadInfo = t.getName();
	    	LOGGER.log(Level.SEVERE, threadInfo + " terminated:", e);

	    	// 创建并启动替代线程
	    	LOGGER.info("About to restart " + threadInfo);
	    	// 重置线程启动标记
	    	inited = false;
	    	init();
	    }

	}

	private class WokrerThread extends Thread {
	    @Override
	    public void run() {
	    	Debug.info("Do something important...");
	    	String msg;
	    	try {
	    		for (;;) {
	    			msg = channel.take();
	    			process(msg);
	    		}
	    	} catch (InterruptedException e) {
	    	}
	    }

	    private void process(String message) {
	      Debug.info(message);
	      // 模拟随机性异常
	      int i = (int) (Math.random() * 100);
	      if (i < 2) {
	    	  throw new RuntimeException("test");
	      }
	      Tools.randomPause(100);
	    }
	}
}

  测试结果中关键部分如下:

  线程组本身也实现了UncaughtExceptionHandler接口。如果一个线程没有关联的UncaughtExceptionHandler实例,那么该线程异常终止前所述线程组的uncaughtException方法会被调用。线程组的uncaughtException方法会调用其父线程组的uncaughtException方法并传递同样的两个参数(t和e)。如果一个线程组没有其父线程组(只有最顶层的线程组没有其父线程组,因此一个Java虚拟机中只有一个线程组没有其父线程组),那么该线程组的uncaughtException方法会调用默认UncaughtExceptionHandler的uncaughtException方法来处理线程的异常终止。
  默认UncaughtExceptionHandler适用于所有线程,即任何一个线程异常终止时默认UncaughtExceptionHandler都有可能会被调用。Thread.setUncaughtExceptionHandler(UncaughtExceptionHandler handler)方法可用来指定默认的UncaughtExceptionHandler。针对一个线程的异常终止,该线程所关联的UncaughtExceptionHandler实例、该线程所在的线程组以及默认UncaughtExceptionHandler之中只有一个UncaughtExceptionHandler实例被选中。UncaughtExceptionHandler实例的选择优先级如下:

2.3 线程工厂

  从JDK1.5开始,引入了创建线程的工厂接口------ThreadFactory,该接口中有一个方法:

	Thread newThread(Runnable r);

  newThread方法可以用来创建线程,该方法的参数r代表所创建的线程需要执行的任务。如果把线程对象看作某种”产品“,new方式创建线程就好比手工制作,而使用ThreadFactory接口创建线程就像流水线生产。开发者可以在ThreadFactory.newThread方法中封装线程创建的逻辑,使得能够以统一的方式为线程的创建、配置做一些非常有用的工作。

2.4 线程的暂挂和恢复

  Thread.suspend()与Thread.resume()两个方法是已经废弃的方法,作用是暂挂和恢复线程。在实现这两个功能时,可以用于停止线程相似的思想来实现线程的暂挂与恢复:设置一个线程暂挂标志,线程每次执行比较耗时的操作前都先检查一下这个标志。如果该标志指示线程应该暂挂,那么线程就执行Object.wait()/Condition.await()暂停,直到其他线程重新设置暂挂标志并将其唤醒。示例代码如下:

/*
 * 线程暂挂、恢复工具类
 */
public class PauseControl extends ReentrantLock {
	private static final long serialVersionUID = 176912639934052187L;
	// 线程暂挂标志
	private volatile boolean suspended = false;
	private final Condition condSuspended = newCondition();

	/**
	 * 暂停线程
	 */
	public void requestPause() {
	    suspended = true;
	}

	/**
	 * 恢复线程
	 */
	public void proceed() {
	    lock();
	    try {
	    	suspended = false;
	    	condSuspended.signalAll();
	    } finally {
	    	unlock();
	    }
	}

	/**
	 * 当前线程仅在线程暂挂标记不为true的情况下才执行指定的目标动作。
	 */
	public void pauseIfNeccessary(Runnable targetAction) throws InterruptedException {
	    lock();
	    try {
	    	while (suspended) {
	    		condSuspended.await();
	    	}
	    	targetAction.run();
	    } finally {
	    	unlock();
	    }
	}
}

public class ThreadPauseDemo {
	final static PauseControl pc = new PauseControl();

	public static void main(String[] args) {
	    final Runnable action = new Runnable() {
	    	@Override
	    	public void run() {
	    		Debug.info("Master,I'm working...");
	    		Tools.randomPause(300);
	    	}
	    };
	    Thread slave = new Thread() {
	    	@Override
	    	public void run() {
	    		try {
	    			for (;;) {
	    				pc.pauseIfNeccessary(action);
	    			}
	    		} catch (InterruptedException e) {
	    		}
	    	}
	    };
	    slave.setDaemon(true);
	    slave.start();
	    askOnBehaveOfSlave();
	}

	static void askOnBehaveOfSlave() {
	    String answer;
	    int minPause = 2000;
	    try (Scanner sc = new Scanner(System.in)) {
	    	for (;;) {
	    		Tools.randomPause(8000, minPause);
	    		pc.requestPause();
	    		Debug.info("Master,may I take a rest now?%n");
	    		Debug.info("%n(1) OK,you may take a rest%n" + "(2) No, Keep working!%nPress any other key to quit:%n");
	    		answer = sc.next();
	    		if ("1".equals(answer)) {
	    			pc.requestPause();
	    			Debug.info("Thank you,my master!");
	    			minPause = 8000;
	    		} else if ("2".equals(answer)) {
	    			Debug.info("Yes,my master!");
	    			pc.proceed();
	    			minPause = 2000;
	    		} else {
	    			break;
	    		}
	    	}
	    }
	    Debug.info("Game over!");
	}
}

  测试结果关键部分如下:

  当线程运行一段时间后,会暂停下来,由开发者决定其是否继续运行或者“休息”。

2.5 线程的高效利用:线程池

  线程的开销主要包括以下几个方面:

  • 1、线程的创建于启动的开销。与普通的对象相比,Java线程还占用了额外的存储空间-----栈空间。并且,线程的启动会产生相应的线程调度开销。
  • 2、线程的销毁。
  • 3、线程调度的开销。线程的调度会导致上下文切换,从而增加处理器资源的消耗,使得应用程序本身可以使用的处理器资源减少。
  • 4、一个系统能够创建的线程重视受限于该系统所拥有的处理器数目。无论是CPU密集型还是I/O密集型线程,这些线程的数量的临界值总是处理器的数目。

  从整个系统或整个主机的角度看,都需要一种有效使用线程的方式,线程池就是一种常见方式。
  常见的对象池(比如数据库连接池)的实现方式是对象池(本身也是个对象)内部维护一定数量的对象,客户端需要一个对象的时候就向对象池申请一个对象,用完后再将该独享返还给对象池,于是对象池中的一个对象就可以先后为多个客户端线程服务。
  线程池本身也是一个对象,不过它的实现方式和普通的对象池不同,图示如下:

  线程池内部可以预先创建一定数量的工作者线程,客户端代码并不需要向线程池借用线程,而是将其执行的任务作为一个对象提交给线程池,线程池可能将这些任务缓存在工作队列自重,而线程池内部的各个工作者线程则不断地从队列中取出任务并执行。因此,线程池可以被看作基于生产者—消费者模式的一种服务,该服务内部维护的工作者线程相当于消费者线程,线程池的客户端相当于生产者线程,客户端代码提交给线程池的任务相当于“产品”,线程池内部用于缓存任务的队列相当于传输通道。
  ThreadPoolExecutor类就是一个线程池,客户端可以用ThreadPoolExecutor.submit向其提交任务。
  线程池内部维护的工作者线程的数量称为该线程池的线程池大小。ThreadPoolExecutor的线程池大小有3种形态:当前线程池大小表示线程池中实际工作者线程的数量;最大线程池大小表示线程池中允许存在的工作者线程的数量上限;核心线程大小表示一个不大于最大线程池大小的工作者线程数量上限。他们之间的关系(有两种)如下:

  • 当前线程池大小 <= 核心线程池大小 <= 最大线程池大小
  • 核心线程池大小 <= 当前线程池大小 <= 最大线程池大小
      “核心线程池大小”和“最大线程池大小”都是由开发者或系统配置数据指定的一个阈值。
      ThreadPoolExecutor构造方法较多,包含参数最多的一个构造方法如下:
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

  workQueue被称为工作队列的阻塞队列,相当于生产者—消费者模式中的传输通道;corePoolSize用于指定线程池核心大小;maximumPoolSize用于指定最大线程池大小;keepAliveTime和unit合在一起用于指定线程池中空闲线程的最大存活时间;threadFactory指定用于创建工作者线程的线程工厂。
  在初始状态下,客户端没提交一个任务,线程池就创建一个工作者线程来处理该任务。随着客户端不断地提交任务,当前线程池大小也相应增加。在当前线程池大小达到核心线程池大小的时候,新来的任务会被存入工作队列之中。这些缓存的任务由线程池中所有的工作者线程负责取出并进行执行。线程池将任务存入工作队列的时候调用的是BlockingQueue的非阻塞方法offer(E e),因此工作队列满并不会使提交任务的客户端线程暂停。
  当工作队列满的时候,线程池会继续创建新的工作者线程,直到当前线程池大小达到最大线程池大小。线程池是通过调用threadFactory.newThread方法来创建工作者线程的。如果在创建线程池的时候没有指定线程工厂,那么ThreadPoolExecutor会调用Executors.defaultThreadFactory()所返回的默认线程工厂。当线程池饱和(即工作者队列满且当前线程池大小达到最大线程池大小的情况)时,客户端试图提交的任务会被拒绝。为了提高线程池的可靠性,引入了RejectedExecutionHandler接口用于封装被拒绝的任务的处理策略。该接口仅定义了如下方法:

	void rejectedExecution(Runnable r, ThreadPoolExecutor e)

  r参数代表被拒绝的任务,e代表拒绝任务r的线程池实例。在初始化线程池时,可以指定一个handler或者通过setRejectedExecutionHandler方法来为线程池关联一个RejectedExecutionHandler。当客户端提交的任务被拒绝时,线程池锁关联的RejectedExecutionHandler的rejectedExecution方法会被线程池调用。如果默认的RejectedExecutionHandler无法满足要求,可以优先考虑ThreadPoolExecutor自身提供的其他RejectedExecutionHandler,其次才去考虑使用自行实现的RejectedExecutionHandler接口。
  ThreadPoolExecutor提供的RejectedExecutionHandler实现类如下:

  在当前线程大小超过线程池核心大小的时候,超过线程池核心大小部分的工作者线程空间达到keepAliveTime所指定的时间后就会被清理掉,即这些工作者线程会自动终止并被从线程池移除。这种空闲线程清理机制有利于节约有限的线程资源,但是keepAliveTime设置的不合理(特别是设置的太小),可能会导致工作者线程频繁地被清理和创建,反而增加了开销。
  线程池中数量等于核心线程池大小的那部分工作者线程,习惯上称之为核心线程。ThreadPoolExecutor.prestartAllCoreThreads()可以使线程池在未接收到任何任务的情况下预先创建并启动所有核心线程,这样可以减少任务被线程池处理时所需的时间(等待核心线程的创建于启动)。
  ThreadPoolExecutor.shutdown()/shutdownNow()方法可以用来关闭线程池。使用shutdown()关闭线程池的时候,已提交的任务会被继续执行,新提交的任务会被拒绝。ThreadPoolExecutor.shutdown()返回的时候线程池可能尚未关闭,即线程池中可能还有工作者线程正在执行,应用代码可以调用ThreadPoolExecutor.awaitTermination(long timeout,TimeUnit unit)来等待线程池关闭结束。
  使用ThreadPoolExecutor.shutdownNow()关闭线程池的时候,正在执行的任务会被停止,已提交而等待执行的任务也不会被执行,该方法的返回值是已提交而未被执行的任务列表,这为被取消的任务重试提供了一个机会。ThreadPoolExecutor.shutdownNow()内部是通过调用工作者线程的interrupt方法来停止正在执行的任务的,因此某些无法响应中断的任务可能永远也不会停止。反过来说,在关闭线程池的时候,如果能确保已经提交的任务都已执行完毕并没有新的任务被提交,那么调用ThreadPoolExecutor.shutdownNow()总是安全可靠的。

2.5.1 任务的处理结果、异常处理与取消

  如果客户端关系任务的处理结果,可以使用另一个submit方法来提交任务,如下:

	public <T> Future<T> submit(Callable<T>  task)

  task参数代表客户端需要提交的任务,其类型为Callable,Callable接口定义的唯一方法是:

	V call() throws Exception

  Callable接口也是对任务的抽象:任务的处理逻辑可以在Callable接口实现类的call方法中实现。Callable接口相当于一个增强型的Runnable接口,call方法的返回值代表相应任务的处理结果,而Runnable接口中的run方法既无返回值也不能抛出异常。Executors.callable(Runnable task,T result)可以将Runnable接口转换为Callable接口实例。
  上面submit方法的返回值类型为Future。Future接口实例可以被看作提交给线程池执行的任务的处理结果权柄,Future.get()可以用来获取task参数所指定的任务的处理结果,如下:

	V get() throws InterruptedException,ExecutionException

  Future.get()被调用时,如果相应的任务尚未执行完毕,那么Future.get()会使当前线程暂停,直到相应的任务执行结束(包括正常结束和抛出异常而终止),因此,Future.get()是个阻塞方法,该方法能够抛出InterruptedException说明它可以相应线程中断。另外,假设相应的任务执行过程中抛出一个任意的异常originalException,那么Future.get()方法本身就会抛出相应的ExecutionException异常,调用这个异常的getCause()方法可返回originalException异常。因此,客户端代码可以通过捕获Future.get()调用抛出来的异常来了解相应任务执行过程中抛出的异常。
  由于在任务未执行完毕的情况下调用Future.get()方法来获取该任务的处理结果会导致等待并由此导致上下文切换,因此客户端代码应该尽可能早地向线程池提交任务,并尽可能晚地调用Future.get()方法来获取任务的处理结果,而线程池则正好利用这段时间来执行已提交的任务。

  客户端代码应该尽可能早地向线程池提交任务,并仅在需要相应任务的处理结果数据的那一刻才调用Future.get()方法。

  Future接口还支持任务的取消,具体方法如下:

	boolean cancel(boolean mayInterruptIfRunning)

  该方法的返回值表示相应的任务取消是否成功。任务取消失败的原因包括取消的任务已经执行完毕或正在执行、已经被取消已经其他无法取消因素。参数mayInterruptIfRunning表示是否允许通过给相应任务的执行线程发送中断来取消任务。Future.isCancalled()返回值代表相应的任务是否被取消成功。由于一个任务被成功取消之后,相应的Future.get()调用会抛出CancellationException异常(运行时异常),因此如果任务有可能会被取消,那么在获取任务的处理结果之前,我们需要先判断任务是否已经被取消了。
  Future.isDone()方法可以检测相应的任务是否执行完毕。任务执行完毕、执行过程中抛出异常已经任务被取消都会导致该方法返回true。
  Future.get()会使其执行线程无限等待,直线相应的任务执行结束。此时可以使用另一个get方法,具体如下:

	V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException

  该方法的作用与Future.get()相同,不过它允许指定一个等待超时时间。如果在该时间内相应的任务未执行结束,那么该方法会抛出TimeoutException。由于该方法参数中指定的超时时间仅仅用于控制客户端线程(即该方法的执行先传给你)等待相应任务的处理结果最多会等待多长时间,而非任务本身的执行时间限制,因此,客户端线程通常需要在捕获TimeoutException之后执行Future.cancel(true)来取消相应任务的执行。

2.5.2 线程池监控

  尽管线程池的大小、工作队列的容量、线程空闲时间限制这些线程池的属性可以通过配置的方式进行制定,但是所指定的值是否恰当还是需要通过监控来判断。ThreadPoolExecutor类中对线程池进行监控的方法如下:

  此外,ThreadPoolExecutor提供的两个钩子方法:beforeExecute(Thread t,Runnable r)和afterExecute(Thread t,Runnable r)也能用于实现监控。假设executor为任意一个ThreadPoolExecutor实例,在人一个亿任务r被线程池executor中的任意一个工作者线程t执行前,executor.beforeExecute(t,r)会被执行;当t执行完r之后,不管r的执行是否是成功的还是抛出了异常,executor.afterExecute(t,r)始终会被执行。因此,如果有必要的话可以通过创建ThreadPoolExecutor的子类并在子类的
beforeExecute/afterExecute方法中实现监控逻辑,比如计算任务执行的平均时间。

2.5.3 线程池死锁

  如果线程池中执行的任务在其执行过程中又会向同一个线程池提交另一个人任务,而前一个任务的执行结束又依赖后一个任务的执行结果,那么就有可能出现这样的情形:线程池中的所有工作者线程都处于等待其他任务的处理结果而这些任务仍在工作队列中等待执行,由于线程池中已经没有可以对工作队列中的任务进行处理的工作者线程,这种等待就会一直持续下去从而形成死锁。
  因此,适合提交给同一线程池实例执行的任务是相互独立的任务,而不是彼此有依赖关系的任务。对于彼此存在依赖关系的任务,可以考虑使用不同的线程池实例来执行这些任务

  同一个线程池只能用于执行相互独立的任务。彼此有依赖关系的任务需要提交给不同的线程池执行以避免死锁。

2.5.4 工作者线程的异常终止

  如果任务是通过ThreadPoolExecutor.submit调用提交给线程池的,那么这些任务在其执行过程中即便抛出了异常也不会导致对其进行执行的工作者线程异常终止。
  如果任务是通过ThreadPoolExecutor.execute方法提交给线程池的,那么这些任务在其执行过程中一旦抛出了未捕获的异常,则对其进行执行的工作者线程就会异常终止。


本文链接: http://www.dtmao.cc/news_show_150366.shtml

附件下载

上一篇:【Git】概念

下一篇:【Git】结构

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?