ActiveMQ java java class golang arrays datetime generics magento razor matlab向上取整 triggers NEJ 管理后台ui bootstrap模板 access教学视频 jq遍历元素 jquery解析json数据 matlab图像识别 java多行注释 mysql获取当前时间戳 hadoop环境变量配置 mysql新建数据库 python调用函数 python处理json文件 java环境 java抽象方法 如何查看java版本 java原始数据类型 vbs脚本 微信签名一句话至自己 图片链接生成器 修改tomcat端口 beatedit 视频加字幕软件 疯狂java win10画图 js保留两位小数 linux运维之道 一键隐藏 圆角矩形工具改变弧度
当前位置: 首页 > 学习教程  > 编程语言

并发编程:Phaser 运行阶段性并发任务

2020/8/11 19:54:38 文章标签:

目录

Phaser

案例说明

一、主程序

二、搜索线程(核心)

三、执行结果


Phaser

当一些并发任务需要分步骤执行时,可以使用该机制。(Phaser类提供了在每一步结束时同步线程的机制,这使得只有当所有线程都完成第一步后,才会有线程开始执行第二步。)

案例说明

下面模拟文件搜索的功能,它拆分为N个阶段进行工作,每个阶段完成后会进入等待,当所有线程都完成该阶段的工作之后,开始执行下一阶段的工作。

 

一、主程序

主程序创建了3个线程,来进行文件的搜索,同时Phaser也设置了3个参与线程数。三个线程统一由phaser对象进行控制,使它们同步得执行每个阶段的工作。

package xyz.jangle.thread.test.n3_5.phaser;

import java.util.concurrent.Phaser;

/**
 * 3.5 Phaser 运行阶段性并发任务
 *   当一些并发任务需要分步骤执行时,可以使用该机制。(Phaser类提供了在每一步结束时同步线程的机制,
 * 这使得只有当所有线程都完成第一步后,才会有线程开始执行第二步。)
 * 
 * @author jangle
 * @email jangle@jangle.xyz
 * @time 2020年8月11日 下午4:58:36
 * 
 */
public class M {

	public static void main(String[] args) {
		Phaser phaser = new Phaser(3);
		FileSearch program86 = new FileSearch("C:\\Program Files (x86)", "log", phaser);
		FileSearch program = new FileSearch("C:\\Program Files", "log", phaser);
		FileSearch workspace = new FileSearch("C:\\workspace", "log", phaser);
		Thread thread1 = new Thread(program86, "program86");
		Thread thread2 = new Thread(program, "program");
		Thread thread3 = new Thread(workspace, "workspace");
		thread1.start();
		thread2.start();
		thread3.start();
		try {
			thread1.join();
			thread2.join();
			thread3.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("程序结束,phaser is terminated:" + phaser.isTerminated());

	}

}

二、搜索线程(核心)

这个线程实现了几个阶段性的工作,并在每个阶段进行等待。                          

  1. 遍历目录搜索所有符合扩展名的文件,并检查是否有记录。                         
  2. 对结果进行过滤,获取近24小时修改过的文件,并检查是否有记录。                    
  3. 输出这些记录。
  4. 完毕注销。
package xyz.jangle.thread.test.n3_5.phaser;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 *  这个线程实现了几个阶段性的工作,并在每个阶段进行等待。
 *  1、遍历目录搜索所有符合扩展名的文件,并检查是否有记录。
 *  2、对结果进行过滤,获取近24小时修改过的文件,并检查是否有记录。
 *  3、输出这些记录。
 *  4、完毕注销。
 * @author jangle
 * @email jangle@jangle.xyz
 * @time 2020年8月11日 下午5:02:51
 * 
 */
public class FileSearch implements Runnable {

	// 需要搜索的文件夹路径
	private final String initPath;
	// 要搜索的文件扩展名
	private final String fileExtension;
	// 存储满足条件的文件全路径
	private List<String> results;
	// 对任务的不同阶段进行同步控制。 important
	private Phaser phaser;

	public FileSearch(String initPath, String fileExtension, Phaser phaser) {
		super();
		this.initPath = initPath;
		this.fileExtension = fileExtension;
		this.phaser = phaser;
		this.results = new ArrayList<String>();
	}

	@Override
	public void run() {
		File file = new File(initPath);
		if (file.isDirectory()) {
			// 递归调用,遍历所有目录与其子目录
			directoryProcess(file);
		}
		if (!checkResults()) {
			// 虽然checkResults中从phaser中注销了,但程序还会继续执行,所以这里需要手动return,结束程序。
			return;
		}
		// 筛选近24小时修改过的文件
		filterResults();
		if (!checkResults()) {
			return;
		}
		// 用于最终输出所有结果到控制台
		showInfo();
		// 注销当前任务
		phaser.arriveAndDeregister();
		System.out.println(Thread.currentThread().getName()+"程序执行完毕。");
	}

	/**
	 * 递归调用,遍历所有目录与其子目录
	 * 
	 * @param file
	 */
	private void directoryProcess(File file) {
		File[] files = file.listFiles();
		if (files != null) {
			for (int i = 0; i < files.length; i++) {
				if (files[i].isDirectory()) {
					directoryProcess(files[i]);
				} else {
					fileProcess(files[i]);
				}
			}
		}
	}

	/**
	 * 记录符合扩展名的文件路径
	 * 
	 * @param file
	 */
	private void fileProcess(File file) {
		if (file.getName().endsWith(fileExtension)) {
			results.add(file.getAbsolutePath());
		}
	}

	/**
	 * 筛选近24小时修改过的文件
	 * 
	 * @author jangle
	 * @time 2020年8月11日 下午5:24:05
	 */
	private void filterResults() {
		ArrayList<String> newResult = new ArrayList<>();
		long currentDate = new Date().getTime();
		for (int i = 0; i < results.size(); i++) {
			File file = new File(results.get(i));
			long fileDate = file.lastModified();
			if (currentDate - fileDate < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)) {
				newResult.add(results.get(i));
			}
		}
		results = newResult;
	}

	/**
	 * 用于检查每个阶段结束之后,结果是否为空。
	 * 
	 * @author jangle
	 * @time 2020年8月11日 下午5:25:11
	 */
	private boolean checkResults() {
		if (results.isEmpty()) {
			System.out.println(Thread.currentThread().getName() + "结果为空,,阶段phaser:" + phaser.getPhase() + "。该阶段结束,当前存在参与线程数"+phaser.getRegisteredParties());
			// 无结果,则完成当前阶段工作,并且不参与后续阶段的工作。
			phaser.arriveAndDeregister();
			return false;
		} else {
			System.out.println(
					Thread.currentThread().getName() + "存在結果" + results.size() + ",阶段phaser:" + phaser.getPhase()+",当前存在参与线程数"+phaser.getRegisteredParties());
			// 有结果,则完成当前阶段工作,并且等待其他线程完成工作,而后进入后续阶段的工作。
			phaser.arriveAndAwaitAdvance();
			return true;
		}
	}

	/**
	 * 用于最终输出所有结果到控制台
	 * 
	 * @author jangle
	 * @time 2020年8月11日 下午5:41:02
	 */
	private void showInfo() {
		for (int i = 0; i < results.size(); i++) {
			File file = new File(results.get(i));
			System.out.println(Thread.currentThread().getName() + ":" + file.getAbsolutePath());
		}
		// 等待其他线程完成输出,再进入后续工作。
		phaser.arriveAndAwaitAdvance();
	}

}

三、执行结果

前3行是搜索文件数量的结果,3-6行是log文件的数量结果,“参与线程数”是指phaser内部的计数,因program无log文件,提前结束了工作内容,从phaser中注销了,故第6行值为2。

workspace存在結果9,阶段phaser:0,当前存在参与线程数3
program86存在結果13,阶段phaser:0,当前存在参与线程数3
program存在結果10,阶段phaser:0,当前存在参与线程数3
workspace存在結果3,阶段phaser:1,当前存在参与线程数3
program结果为空,,阶段phaser:1。该阶段结束,当前存在参与线程数3
program86存在結果1,阶段phaser:1,当前存在参与线程数2
program86:C:\Program Files (x86)\DingDing\main\current_new\debug.log
workspace:C:\workspace\.metadata\.log
workspace:C:\workspace\.metadata\.plugins\org.eclipse.m2e.logback.configuration\0.log
workspace:C:\workspace\.metadata\.plugins\org.eclipse.rse.core\.log
workspace程序执行完毕。
program86程序执行完毕。
程序结束,phaser is terminated:true

 


本文链接: http://www.dtmao.cc/news_show_100237.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?