`
BradyZhu
  • 浏览: 248180 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

使用Java7提供的Fork/Join框架

 
阅读更多

在Java7中,JDK提供对多线程开发提供了一个非常强大的框架,就是Fork/Join框架。这个是对原来的Executors更

进一步,在原来的基础上增加了并行分治计算中的一种Work-stealing策略,就是指的是。当一个线程正在等待他创建的

子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和

Executors这个方式最大的区别,更加有效的使用了线程的资源和功能。所以非常推荐使用Fork/Join框架。


下面我们以一个例子来说明这个框架如何使用,主要就是创建一个含有10000个资源的List,分别去修改他的内容。


package com.bird.concursey.charpet8;

/**
 * store the name and price of a product
 * @author bird 2014年10月7日 下午11:23:14
 */
public class Product {

	private String name;
	private double price;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public double getPrice() {
		return price;
	}

	public void setPrice(double price) {
		this.price = price;
	}

}


package com.bird.concursey.charpet8;

import java.util.ArrayList;
import java.util.List;

/**
 * generate a list of random products
 * @author bird
 * 2014年10月7日 下午11:24:47
 */
public class ProductListGenerator {
	
	public List<Product> generate(int size) {
		List<Product> list = new ArrayList<Product>();
		for(int i = 0 ; i < size; i++) {
			Product product = new Product();
			product.setName("Product" + i);
			product.setPrice(10);
			list.add(product);
		}
		return list;
	}
}


package com.bird.concursey.charpet8;

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class Task extends RecursiveAction {

	private static final long serialVersionUID = 1L;
	// These attributes will determine the block of products this task has to
	// process.
	private List<Product> products;
	private int first;
	private int last;
	// store the increment of the price of the products
	private double increment;

	public Task(List<Product> products, int first, int last, double increment) {
		super();
		this.products = products;
		this.first = first;
		this.last = last;
		this.increment = increment;
	}

	/**
	 * If the difference between the last and first attributes is greater than
	 * or equal to 10, create two new Task objects, one to process the first
	 * half of products and the other to process the second half and execute
	 * them in ForkJoinPool using the invokeAll() method.
	 */
	@Override
	protected void compute() {
		if (last - first < 10) {
			updatePrices();
		} else {
			int middle = (first + last) / 2;
			System.out.printf("Task: Pending tasks:%s\n", getQueuedTaskCount());
			Task t1 = new Task(products, first, middle + 1, increment);
			Task t2 = new Task(products, middle + 1, last, increment);
			invokeAll(t1, t2);
		}
	}

	private void updatePrices() {
		for (int i = first; i < last; i++) {
			Product product = products.get(i);
			product.setPrice(product.getPrice() * (1 + increment));
		}
	}

	public static void main(String[] args) {
		ProductListGenerator productListGenerator = new ProductListGenerator();
		List<Product> products = productListGenerator.generate(10000);
		Task task = new Task(products, 0, products.size(), 0.2);

		ForkJoinPool pool = new ForkJoinPool();
		pool.execute(task);

		do {
			System.out.printf("Main: Thread Count: %d\n",
					pool.getActiveThreadCount());
			System.out.printf("Main: Thread Steal: %d\n", pool.getStealCount());
			System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
			try {
				TimeUnit.MILLISECONDS.sleep(5);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		} while (!task.isDone());
		
		pool.shutdown();
		
		if(task.isCompletedNormally()) {
			System.out.printf("Main: The process has completed normally.\n");
		}
		
		for(Product product : products) {
			if(product.getPrice() != 12) {
				System.out.printf("Product %s: %f\n",product.getName(),product.getPrice());
			}
		}
		
		System.out.println("Main: End of the program.\n");
	}

}


In this example, you have created a ForkJoinPool object and a subclass of the
ForkJoinTask class that you execute in the pool. To create the ForkJoinPool object,
you have used the constructor without arguments, so it will be executed with its default
configuration. It creates a pool with a number of threads equal to the number of processors
of the computer. When the ForkJoinPool object is created, those threads are created and
they wait in the pool until some tasks arrive for their execution.


Since the Task class doesn't return a result, it extends the RecursiveAction class. In the
recipe, you have used the recommended structure for the implementation of the task. If the
task has to update more than 10 products, it divides those set of elements into two blocks,
creates two tasks, and assigns a block to each task. You have used the first and last
attributes in the Task class to know the range of positions that this task has to update in the
list of products. You have used the first and last attributes to use only one copy of the
products list and not create different lists for each task.


To execute the subtasks that a task creates, it calls the invokeAll() method. This is a
synchronous call, and the task waits for the finalization of the subtasks before continuing
(potentially finishing) its execution. While the task is waiting for its subtasks, the worker thread
that was executing it takes another task that was waiting for execution and executes it. With
this behavior, the Fork/Join framework offers a more efficient task management than the
Runnable and Callable objects themselves.


The invokeAll() method of the ForkJoinTask class is one of the main differences
between the Executor and the Fork/Join framework. In the Executor framework, all the tasks
have to be sent to the executor, while in this case, the tasks include methods to execute and
control the tasks inside the pool. You have used the invokeAll() method in the Task class,
that extends the RecursiveAction class that extends the ForkJoinTask class.

You have sent a unique task to the pool to update all the list of products using the execute()
method. In this case, it's an asynchronous call, and the main thread continues its execution.
You have used some methods of the ForkJoinPool class to check the status and the
evolution of the tasks that are running. The class includes more methods that can be useful
for this purpose. See the Monitoring a Fork/Join pool recipe for a complete list of
those methods.

Finally, like with the Executor framework, you should finish ForkJoinPool using the
shutdown() method.

分享到:
评论

相关推荐

    Java中的Fork/Join框架

     fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。  fork/join...

    Fork/Join框架Package jsr166y

    Fork/Join框架Package jsr166y是Java 7并行编程类的的初步版本(Preliminary versions of classes targeted for Java 7.)

    java Fork Join框架及使用

    java Fork Join框架及使用,java自带的多线程框架,来处理多线程的问题

    Java Fork/Join框架

    Fork/Join框架是Java7中新增的一项特性,也是Java7平台的其中一项主要改进。下面我们就来简单探讨下Java的Fork/Join框架

    java fork-join框架介绍

    fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。

    ForkJoinUtil.java,一个分而治之的框架工具类

    Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是...

    fork/join 实例

    fork join 框架 生产使用实例,可以直接修改配置,实现业务。

    Fork Join框架机制详解.docx

    Java提供Fork/Join框架用于并行执行任务,核心的思想就是将一个大任务切分成多个小任务,然后汇总每个小任务的执行结果得到这个大任务的最终结果。 这种机制策略在分布式数据库中非常常见,数据分布在不同的数据库的...

    Fork:join框架与CompleteableFuture源码解析.pptx

    全网第一篇通过图文介绍Fork/Join框架与CompleteableFuture的PPT

    Java并发Fork-Join框架原理

    Java并发Fork-Join框架原理

    浅谈Java Fork/Join并行框架

    主要介绍了浅谈Java Fork/Join并行框架,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    Java ForkJoin框架的原理及用法

    主要介绍了Java ForkJoin框架的原理及用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    Fork-Join框架演示

    Java Fork-Join 论文 排序

    java8中forkjoin和optional框架使用

    主要介绍了java8中forkjoin和optional框架使用心得以及用法讲解,需要的朋友参考下吧。

    ForkJoin并发框架入门示例

    介绍了ForkJoin并发框架,供有java基础者学习,工作配合使用,附件带有PPT,介绍并发与并行区别,和ForkJoin代码范例,资源来自网络,分享分享!

    Java 7并发编程实战手册

    全书分为9章,涵盖了线程管理、线程同步、线程执行器、Fork/Join框架、并发集合、定制并发类、测试并发应用等内容。全书通过60多个简单而非常有效的实例,帮助读者快速掌握Java 7多线程应用程序的开发技术。学习完...

    java-fork-join-example

    与ExecutorService其他实现不同,Fork / Join框架使用工作窃取算法( ),该算法可最大程度地利用线程,并提供了一种更简单的方式来处理产生其他任务的任务(称为子任务)。 以下列出的所有代码都可以在以下位置...

    java中的forkjoin框架的使用

    主要介绍了java中的fork join框架的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    并发编程笔记20190526.docx

    一、 Fork/Join框架的介绍 21 1、实现步骤: 22 2、工作窃取算法 22 3、分而治之 23 4、Fork/Join使用的标准范式 24 5、Fork/Join框架的异常处理 26 6、Fork/Join框架的实现原理 26 二、闭锁CountDownLatch 28 1、...

    分布式与并行计算—Java实现并向算法.ZIP

    算法中的并行使用java的Fork / Join框架实现,他会将进程使用ForkJoinPool进行管理,并自动分配到空闲的CPU核心上来运算。由于个人PC的CPU核心数量较少,所以预期至多能产生常数倍的加速 效果。 本次实验使用的实验...

Global site tag (gtag.js) - Google Analytics