Redis html bitmap jqgrid 后台ui模板 idea中svn的使用 matlab中不等于怎么表示 css鼠标悬浮样式 小程序下拉刷新样式 python中open python中for循环的用法 javaindexof java安装环境 java学习教程 java调用方法 java架构 linux下载安装 php语言入门 亚索刀光特效包 影视后期软件 神剪辑教程 saminside 矩阵分析与应用 用流量打电话的软件 特战英雄辅助 cdr怎么画波浪线 sendto函数 骰子gif win98序列号 全能音频转换通 opencv是什么 数据库同步解决方案 任务栏跑到右侧怎么办 flash制作教程 excel指数函数 飞鸽传书怎么用 ps做表格 octane渲染器 obs怎么设置 抠图ps
当前位置: 首页 > 学习教程  > 编程语言

Java多线程实战——并发框架

2020/9/19 15:46:49 文章标签:

文章目录

    • 并发架构
      • 任务执行
      • 并发的取消与关闭
      • 线程池的使用
    • 多线程应该避免的问题
      • 避免活跃性问题
      • 注意性能
      • 并发程序测试
    • 并发高阶应用
      • 显式锁
      • Java内存模型
    • 附录

并发架构

任务执行

  • 为什么需要并发框架
    更快的响应、更高的吞吐率、更可靠的稳定性

串行执行任务
例如下面这个例子:

/**
 * 串行的Web服务器: 因为此程序每次只能处理一个请求,所以其执行性能很糟糕
 */
public class SingleThreadWebServer {

  public static void main(String[] args) throws IOException {
    ServerSocket socket = new ServerSocket(80);
    while (true) {
      Socket connection = socket.accept();
      // handleRequest(connection);
    }
  }
  
}

无限制地显示为任务创建线程

线程生命周期的开销非常高:
资源消耗:活跃的线程会消耗系统资源,尤其是内存,通常可运行的线程数量多余可用处理器的数量,则大量闲置的线程会占用许多内存
稳定性:大量的独立线程的虚拟机栈、本地方法栈会占用大量内存,可能会导致OutOfMemoryError异常。

例如下面这个例子:

/**
 * 在Web服务器中为每个请求启动一个新的线程(不要这么做)
 */
public class ThreadPerTaskWebServer {

  public static void main(String[] args) throws IOException {
    ServerSocket socket = new ServerSocket(80);
    while (true) {
      final Socket connection = socket.accept();
      Runnable task = new Runnable() {
        public void run() {
          // handleRequest(connection);
        }
      };
      new Thread(task).start();
    }
  }

}
  • Executor框架

线程池

线程池与工作队列(Work Queue)密切相关,其中在工作队列中保存了所有等待执行的任务。工作者线程(Work Thread)任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

  • newFixedThreadPool: 此静态工厂方法创建一个固定长度的线程池,每当提交一个任务就创建一个线程,直到达到线程池最大数量,这时线程池规模不再变化(如果有线程异常退出,则线程池会补充一个新的线程)。
  • newSingleThreadExecutor: 此静态工厂方法是一个单线程的Executor,它创建单个工作者线程来执行任务,按照任务在队列中的顺序来串行执行(如果线程异常结束,会创建另一个线程来替代)
  • newScheduledThreadPool: 此静态工厂方法创建一个固定长度的线程池,并且以延迟或定时的方法来执行任务,类似于Timer。

Executor生命周期

Executor存在的问题: Executor以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。而且Executor是通过创建线程来执行任务。但JVM只有在所有(非守护)线程全部终止后才会退出。因此,若无法正确地关闭Executor,则JVM无法结束。

扩展ExecutorService接口: 添加一些用于生命周期管理的方法

public interface ExecutorService extends Executor {

  /**
   * 执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务
   */
  void shutdown();

  /**
   * 执行粗暴的关闭过程:尝试取消所有运行中的任务,并不再启动队列中尚未开始执行的任务
   * @return
   */
  List<Runnable> shutdownNow();
  
  boolean isShutdown();

  /**
   * 轮询ExecutorService是否已经终止
   * @return
   */
  boolean isTerminated();

  /**
   * 等待ExecutorService到达终止状态
   * @param timeout
   * @param unit
   * @return
   * @throws InterruptedException
   */
  boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
  // ... 其他用于任务提交的便利方法

}

  • 携带结果的任务Callable与Future

Executor存在的问题:

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run()方法能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常

  • Callable是一种更好的抽象:即它有一个主入口点(call()方法)最终会返回一个值,并可能会抛出一个异常。
  • Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及任务的结果和取消任务等。必要时,可以通过get()获取执行结果,该方法会阻塞直到任务返回结果。
// Callable与Future接口
public interface Callable<V> {
  V call() throws Exception;
}

public interface Future<V> {
  // 取消任务,成功取消返回true,失败返回false。其中mayInter...表示是否允许取消正在执行却没有执行完毕的任务
  boolean cancel(boolean mayInterruptIfRunning);
  // 任务是否被取消:取消成功,返回true
  boolean isCancelled();
  // 任务是否已经完成,已完成,返回true
  boolean isDone();
  // 获取执行结果,此方法会产生阻塞,会一直等到任务执行完毕才返回
  V get() throws InterruptedException, ExecutionException, CancellationException;
  // 用来获取执行结果,若在指定时内没获取到结果,则直接返回null
  V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}

Future:
总结上面Future接口的三个功能:

  1. 判断任务是否完成;
  2. 中断任务;
  3. 获取任务执行结果。

FutureTask:
因为Future只是一个接口,无法直接用来创建对象,因此就有了FutureTask。
下图展示了FutureTask的继承关系:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s0Zn3tL4-1600501203849)(quiver-image-url/10E01A3930B05107F951E7F94F6A97AA.jpg =267x215)]

所以,FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

并发的取消与关闭

  • 任务取消

下面是任务取消的一个例子:

import static java.util.concurrent.TimeUnit.SECONDS;

/**
 * 取消协作机制:设置某个"已请求取消"标志,而任务将定期地查看该标志。如果设置了这个
 * 标志,则任务将提前结束(为了使这个过程能可靠地工作,标志cancelled必须为volatile类型)
 *
 * 存在问题:下面的cancel方法由finally块调用,从而确保即使在调用sleep时被中断也能
 * 取消素数生成器的执行。如果cancel没有被调用,则搜索素数的线程将永远运行下去,不断消耗
 * CPU的时钟周期,并使得JVM不能正常退出。
 */
public class PrimeGenerator implements Runnable {
  private final List<BigInteger> primes = new ArrayList<>();
  private volatile boolean cancelled;

  @Override
  public void run() {
    BigInteger p = BigInteger.ONE;
    while (!cancelled) {
      p = p.nextProbablePrime();
      synchronized (this) {
        primes.add(p);
      }
    }
  }

  public void cancel() {
    cancelled = true;
  }

  public synchronized List<BigInteger> get() {
    return new ArrayList<>(primes);
  }

  /**
   * 一个仅运行一秒钟的素数生成器
   * @return
   * @throws InterruptedException
   */
  public static List<BigInteger> oneSecondOfPrimes() throws InterruptedException {
    PrimeGenerator generator = new PrimeGenerator();
    new Thread(generator).start();
    try {
      SECONDS.sleep(1);
    } finally {
      generator.cancel();
    }

    return generator.get();
  }
}

中断
上面的取消机制存在问题:
如果使用取消机制的任务调用了一个阻塞方法,例如BlockingQueue.put(), 则可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会结束。

/**
 * 下面的例子中:若生产者的速度超过了消费者的处理速度,队列将被填满,put()将会阻塞。
 * 当生产者在put()中阻塞时,若此时消费者希望取消生产者任务,它可以调用cancel()来设置
 * cancelled标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put()中恢复过来
 * (因为消费者此时已经停止从队列中取出素数,所以put()将一直保持阻塞状态)
 */
public class BrokenPrimeProducer extends Thread {

  private final BlockingQueue<BigInteger> queue;
  private volatile boolean cancelled = false;

  BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      BigInteger p = BigInteger.ONE;
      while (!cancelled)
        queue.put(p = p.nextProbablePrime());
    } catch (InterruptedException consumed) {}
  }

  public void cancel() {
    cancelled = true;
  }

  public static void consumePrimes() throws InterruptedException {
    BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(10);
    BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
    producer.start();
    try {
//      while (needMorePrimes())
//        consume();
    } finally {
      producer.cancel();
    }
  }
}

线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程。

每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。Thread中包含了中断线程以及查询线程中断状态的方法,如下所示:

public class Thread {
  // 中断目标线程
  public void interrupt() {...}
  // 返回目标线程的中断状态
  public boolean isInterrupted() {...}
  // 静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,也是清除中断状态的唯一方法
  public static boolean interrupted() {...}
  ...

阻塞库方法,如Thread.sleep和Object.wait等,都会检查线程何时中断,并在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。
当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态判断发生了中断。如果不触发InterruptedException,则中断状态将一直保持,直到明确地清除中断状态。
调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。有些方法,例如wait、sleep和join等,将严格地处理这种请求,当它们收到中断请求或者正在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。
通常,中断是实现取消的最合理方式。
解决上面BrokenPrimeProducer.java中呈现的问题例子如下:

/**
 * BrokenPrimeProducer中的问题很容易解决:使用中断而不是boolean标志来请求取消,
 * 如下程序每次迭代循环中,有两个位置可以检测出中断:在阻塞的put()调用中,以及在
 * 循环开始处查询中断状态时
 */
public class PrimeProducer extends Thread {

  private final BlockingQueue<BigInteger> queue;

  PrimeProducer(BlockingQueue<BigInteger> queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      BigInteger p = BigInteger.ONE;
      while (!Thread.currentThread().isInterrupted())
        queue.put(p = p.nextProbablePrime());
    } catch (InterruptedException consumed) {
      // 允许线程退出
    }
  }

  public void cancel() {
    interrupt();
  }
}

响应中断

当调用可中断的阻塞函数时,例如Thread.sleep或BlockingQueue.put等,有两种实用策略可用于处理InterruptedException:

  • 传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
  • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。
  • 通过中断来取消任务-深入实践/总结

线程池的使用

  • 线程池基本使用-见并发基础相关内容

  • 设置线程池大小
    现有如下定义:
    N(cpu) = number of CPUs
    U(cpu) = target CPU utilization, 0 <= U(cpu) <= 1
    W/C = ratio of wait of wait time to compute time
    要使处理器达到期望的使用率,线程池的最优大小等于:
    N(threads) = N(cpu)U(cpu)(1 + W/C)

  • 配置ThreadPoolExecutor
    ThreadPoolExecutor定义了很多构造函数,下面是最常见的形式:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {...}

多线程应该避免的问题

避免活跃性问题

注意性能

并发程序测试

并发高阶应用

显式锁

参考另一篇文章:
Java并发-Alluxio Metrics实践

Java内存模型

附录

  • 参考文章
    https://www.cnblogs.com/xiaoxi/p/8303574.html

本文链接: http://www.dtmao.cc/news_show_200313.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?