Table 23.3 Task-Defining Classes

Classes in the java.util.concurrent package to define tasksDescription
ForkJoinTask<V>Abstract class that defines a task
RecursiveAction extends ForkJoinTask<Void>Abstract class that should be extended to define a task that does not return a value
RecursiveTask<V> extends ForkJoinTask<V>Abstract class that should be extended to define a task that returns a value

Example 23.6 illustrates using the Fork/Join Framework. It counts the number of values in an array of random integers that satisfy a given predicate—in this particular case, all numbers divisible by 7. The FilterTask class at (1) extends the RecursiveTask<Integer> abstract class to define a task for this purpose. It implements the compute() method at (2) that mimics the divide-and-conquer algorithm.

The array to filter for numbers divisible by 7 is successively divided into two equal subarrays given by the indices fromIndex, toIndex, and middle. If the size of a subarray is less than the threshold, the base case at (3) is executed. It constitutes iterating over the subarray to do the counting, and returning the result. Note that the base case does not entail further subdivision of the task.

If the size of the array is over the threshold, two new filter tasks are created at (4) with two equal subarrays from the current subarray. The two new filter tasks are forked at (5) by calling the invokeAll() static method. Recursion occurs when the two subtasks are executed and their compute() method is called. The result returned from each subtask can be obtained by calling the inherited join() method from the superclass ForkJoinTask. These results are combined at (6) to return the combined result of the current subarray.

The main() method of the ForkJoinDemo class shows the steps to set up the fork-join pool for execution. The initial task is created at (9). The fork-join pool is create at (10) and invoked on the task at (11) to initiate the computation. The invoke() method returns the combined result of all the subtasks that were created in executing the initial task.

The powerful paradigm of parallel streams is evident from the stream-based solution at (12) when compared with the solution using the Fork/Join Framework explicitly.

Example 23.6 Filtering Values Using Fork/Join Framework

Click here to view code image

package forkjoin;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
class FilterTask extends RecursiveTask<Integer> {                        // (1)
  public static final int THRESHOLD = 10_000;
  private int[] values;
  private int fromIndex;
  private int toIndex;
  private IntPredicate predicate;
  public FilterTask(int[] values, int fromIndex, int toIndex,
                    IntPredicate predicate) {
    this.values = values;
    this.fromIndex = fromIndex;
    this.toIndex = toIndex;
this.predicate = predicate;
  }
  @Override
  protected Integer compute() {                                          // (2)
    if (toIndex – fromIndex < THRESHOLD) {
      // The base case:                                                     (3)
      var count = 0;
      for (int i = fromIndex; i < toIndex; i++) {
        if (predicate.test(values[i])) {
          ++count;
        }
      }
      return count;
    } else {
      // Create subtasks:                                                   (4)
      var middel = fromIndex + (toIndex – fromIndex) / 2;
      var subtask1 = new FilterTask(values, fromIndex, middel, predicate);
      var subtask2 = new FilterTask(values, middel, toIndex, predicate);
      // Fork and execute the subtasks. Await completion.                   (5)
      ForkJoinTask.invokeAll(subtask1, subtask2);
      // Combine the results returned by subtasks.                          (6)
      return subtask1.join() + subtask2.join();
    }
  }
}
public class ForkJoinDemo {
  public static void main(String[] args) {
    // Set up the array with the random int values:                         (7)
    final int SIZE = 1_000_000;
    int[] numbers = new Random().ints(SIZE).toArray();
    // Predicate to filter numbers divisible by 7:                          (8)
    IntPredicate predicate = i -> i % 7 == 0;
    // Create the initial task.                                             (9)
    var filterTask = new FilterTask(numbers, 0, numbers.length, predicate);
    // Create fork-join pool to manage execution of the task.               (10)
    var pool = new ForkJoinPool();
    // Perform the task, await completion, and return the result:           (11)
    var result = pool.invoke(filterTask);
    System.out.println(“Fork/Join:       ” + result);

    // Solution using parallel stream:                                      (12)
    System.out.println(“Parallel stream: ” +
                        IntStream.range(0, numbers.length).parallel()
                                 .map(i -> numbers[i]).filter(predicate).count());
  }
}

Probable output from the program:

Fork/Join:       142733
Parallel stream: 142733