Implementing a parallel_for_each function
Categories:
Implementing a parallel_for_each
Function in C++

Learn how to create a generic parallel_for_each
function in C++ using modern multithreading features to efficiently process collections.
Processing large collections of data can be a bottleneck in many applications. While C++ offers powerful tools for concurrency, a simple and generic way to parallelize iteration over a range is often desired. This article guides you through implementing a parallel_for_each
function, similar to std::for_each
, but designed to distribute the workload across multiple threads. We'll leverage C++11 and later features like std::thread
, std::future
, and std::async
to achieve this.
Understanding the Need for Parallel Iteration
Traditional for_each
iterates sequentially, which is fine for small datasets or operations with dependencies. However, when each element's processing is independent, sequential execution leaves significant performance on the table, especially on multi-core processors. A parallel_for_each
aims to exploit these available cores by dividing the input range into chunks and processing each chunk concurrently.
flowchart TD A[Input Range] --> B{Divide into Chunks} B --> C1[Chunk 1] B --> C2[Chunk 2] B --> C3[Chunk N] C1 --> P1(Process Chunk 1 in Thread 1) C2 --> P2(Process Chunk 2 in Thread 2) C3 --> PN(Process Chunk N in Thread N) P1 & P2 & PN --> D[Combine Results / Wait for Completion] D --> E[Output]
Conceptual flow of a parallel_for_each operation.
Core Implementation Strategy
Our parallel_for_each
will take a range (defined by iterators) and a callable object (the function to apply to each element). The strategy involves:
- Determining Concurrency: Decide how many threads to use. A common approach is
std::thread::hardware_concurrency()
, but it's often beneficial to cap this to avoid oversubscription. - Chunking the Work: Divide the total number of elements into roughly equal-sized chunks.
- Launching Threads: For each chunk, launch a new thread (or use a thread pool) to process its portion of the range.
- Synchronization: Wait for all launched threads to complete their work before returning.
#include <iostream>
#include <vector>
#include <thread>
#include <algorithm>
#include <future>
#include <numeric>
template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
long long total_elements = std::distance(first, last);
if (total_elements == 0) {
return;
}
unsigned int num_threads = std::thread::hardware_concurrency();
if (num_threads == 0) {
num_threads = 1; // Fallback for systems that don't report concurrency
}
// Cap threads to avoid excessive overhead for small ranges
if (total_elements < num_threads) {
num_threads = total_elements;
}
long long chunk_size = total_elements / num_threads;
long long remainder = total_elements % num_threads;
std::vector<std::future<void>> futures;
Iterator current_first = first;
for (unsigned int i = 0; i < num_threads; ++i) {
long long current_chunk_size = chunk_size + (i < remainder ? 1 : 0);
if (current_chunk_size == 0) continue; // Skip if no elements for this thread
Iterator current_last = current_first;
std::advance(current_last, current_chunk_size);
futures.push_back(std::async(std::launch::async, [current_first, current_last, f]() {
std::for_each(current_first, current_last, f);
}));
current_first = current_last;
}
for (auto& fut : futures) {
fut.get(); // Wait for each thread to complete
}
}
// Example usage:
int main() {
std::vector<int> data(1000000);
std::iota(data.begin(), data.end(), 0);
// Parallel processing: double each element
parallel_for_each(data.begin(), data.end(), [](int& x) {
x *= 2;
});
// Verify a few elements
std::cout << "First element: " << data[0] << std::endl; // Should be 0
std::cout << "100th element: " << data[99] << std::endl; // Should be 198
std::cout << "Last element: " << data[data.size() - 1] << std::endl; // Should be 1999998
// Example with a different type and operation
std::vector<double> doubles(50000);
std::iota(doubles.begin(), doubles.end(), 0.5);
parallel_for_each(doubles.begin(), doubles.end(), [](double& d) {
d = d * d;
});
std::cout << "\nFirst double element squared: " << doubles[0] << std::endl; // Should be 0.25
std::cout << "Last double element squared: " << doubles[doubles.size() - 1] << std::endl; // Should be (49999.5)^2
return 0;
}
std::async
with std::launch::async
is a convenient way to launch tasks on separate threads and retrieve their results (or wait for completion) via std::future
. It handles thread creation and management, simplifying the code compared to direct std::thread
management.Considerations and Enhancements
While the provided implementation is functional, several aspects can be considered for robustness and performance:
- Iterator Categories: The current implementation assumes
RandomAccessIterator
forstd::advance
andstd::distance
. ForInputIterator
orForwardIterator
, a different chunking strategy (e.g., passing iterators directly and incrementing within each thread) would be needed, or the range could be copied to astd::vector
first. - Exception Handling: The current version does not explicitly handle exceptions thrown by the callable
f
. In a production environment, you might want to catch exceptions within the lambda and rethrow them after all threads complete, or aggregate them. - Thread Pool: For very frequent calls to
parallel_for_each
with small tasks, the overhead of creating and destroyingstd::thread
objects can be significant. A pre-initialized thread pool would be more efficient. - False Sharing: When threads operate on data that is physically close in memory but logically independent, cache line contention (false sharing) can occur, degrading performance. For
std::vector<bool>
or other tightly packed data, this might be a concern. - Return Values: If the callable
f
needs to return a value for each element, the design would need to be adapted to collect these results, perhaps into astd::vector<std::future<ResultType>>
.
f
. If multiple threads write to the same memory location without proper synchronization (e.g., mutexes), you will encounter data races and undefined behavior. The provided example modifies elements in place, assuming each element is accessed by only one thread.