Implementing a parallel_for_each function

Learn implementing a parallel_for_each function with practical examples, diagrams, and best practices. Covers c++, multithreading, templates development techniques with visual explanations.

Implementing a parallel_for_each Function in C++

Hero image for Implementing a parallel_for_each function

Learn how to create a generic parallel_for_each function in C++ using modern multithreading features to efficiently process collections.

Processing large collections of data can be a bottleneck in many applications. While C++ offers powerful tools for concurrency, a simple and generic way to parallelize iteration over a range is often desired. This article guides you through implementing a parallel_for_each function, similar to std::for_each, but designed to distribute the workload across multiple threads. We'll leverage C++11 and later features like std::thread, std::future, and std::async to achieve this.

Understanding the Need for Parallel Iteration

Traditional for_each iterates sequentially, which is fine for small datasets or operations with dependencies. However, when each element's processing is independent, sequential execution leaves significant performance on the table, especially on multi-core processors. A parallel_for_each aims to exploit these available cores by dividing the input range into chunks and processing each chunk concurrently.

flowchart TD
    A[Input Range] --> B{Divide into Chunks}
    B --> C1[Chunk 1]
    B --> C2[Chunk 2]
    B --> C3[Chunk N]
    C1 --> P1(Process Chunk 1 in Thread 1)
    C2 --> P2(Process Chunk 2 in Thread 2)
    C3 --> PN(Process Chunk N in Thread N)
    P1 & P2 & PN --> D[Combine Results / Wait for Completion]
    D --> E[Output]

Conceptual flow of a parallel_for_each operation.

Core Implementation Strategy

Our parallel_for_each will take a range (defined by iterators) and a callable object (the function to apply to each element). The strategy involves:

  1. Determining Concurrency: Decide how many threads to use. A common approach is std::thread::hardware_concurrency(), but it's often beneficial to cap this to avoid oversubscription.
  2. Chunking the Work: Divide the total number of elements into roughly equal-sized chunks.
  3. Launching Threads: For each chunk, launch a new thread (or use a thread pool) to process its portion of the range.
  4. Synchronization: Wait for all launched threads to complete their work before returning.
#include <iostream>
#include <vector>
#include <thread>
#include <algorithm>
#include <future>
#include <numeric>

template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
    long long total_elements = std::distance(first, last);
    if (total_elements == 0) {
        return;
    }

    unsigned int num_threads = std::thread::hardware_concurrency();
    if (num_threads == 0) {
        num_threads = 1; // Fallback for systems that don't report concurrency
    }

    // Cap threads to avoid excessive overhead for small ranges
    if (total_elements < num_threads) {
        num_threads = total_elements;
    }

    long long chunk_size = total_elements / num_threads;
    long long remainder = total_elements % num_threads;

    std::vector<std::future<void>> futures;
    Iterator current_first = first;

    for (unsigned int i = 0; i < num_threads; ++i) {
        long long current_chunk_size = chunk_size + (i < remainder ? 1 : 0);
        if (current_chunk_size == 0) continue; // Skip if no elements for this thread

        Iterator current_last = current_first;
        std::advance(current_last, current_chunk_size);

        futures.push_back(std::async(std::launch::async, [current_first, current_last, f]() {
            std::for_each(current_first, current_last, f);
        }));

        current_first = current_last;
    }

    for (auto& fut : futures) {
        fut.get(); // Wait for each thread to complete
    }
}

// Example usage:
int main() {
    std::vector<int> data(1000000);
    std::iota(data.begin(), data.end(), 0);

    // Parallel processing: double each element
    parallel_for_each(data.begin(), data.end(), [](int& x) {
        x *= 2;
    });

    // Verify a few elements
    std::cout << "First element: " << data[0] << std::endl; // Should be 0
    std::cout << "100th element: " << data[99] << std::endl; // Should be 198
    std::cout << "Last element: " << data[data.size() - 1] << std::endl; // Should be 1999998

    // Example with a different type and operation
    std::vector<double> doubles(50000);
    std::iota(doubles.begin(), doubles.end(), 0.5);

    parallel_for_each(doubles.begin(), doubles.end(), [](double& d) {
        d = d * d;
    });

    std::cout << "\nFirst double element squared: " << doubles[0] << std::endl; // Should be 0.25
    std::cout << "Last double element squared: " << doubles[doubles.size() - 1] << std::endl; // Should be (49999.5)^2

    return 0;
}

Considerations and Enhancements

While the provided implementation is functional, several aspects can be considered for robustness and performance:

  • Iterator Categories: The current implementation assumes RandomAccessIterator for std::advance and std::distance. For InputIterator or ForwardIterator, a different chunking strategy (e.g., passing iterators directly and incrementing within each thread) would be needed, or the range could be copied to a std::vector first.
  • Exception Handling: The current version does not explicitly handle exceptions thrown by the callable f. In a production environment, you might want to catch exceptions within the lambda and rethrow them after all threads complete, or aggregate them.
  • Thread Pool: For very frequent calls to parallel_for_each with small tasks, the overhead of creating and destroying std::thread objects can be significant. A pre-initialized thread pool would be more efficient.
  • False Sharing: When threads operate on data that is physically close in memory but logically independent, cache line contention (false sharing) can occur, degrading performance. For std::vector<bool> or other tightly packed data, this might be a concern.
  • Return Values: If the callable f needs to return a value for each element, the design would need to be adapted to collect these results, perhaps into a std::vector<std::future<ResultType>>.