A Rust library that provides a thread pool implementation designed to execute the same operation in parallel on any number of inputs (this is sometimes called a "worker pool").
- Operations are defined by implementing the
Workertrait. - A
Builderis used to configure and create a worker pool called aHive. Hiveis generic over- The type of
Queenwhich createsWorkerinstances - The type of
TaskQueues, which provides the global and worker thread-local queues for managing tasks
- The type of
- Currently, two
TaskQueuesimplementations are available:- Channel: uses a
crossbeamchannel to send tasks from theHiveto worker threads- When the
local-batchfeature is enabled, local batch queues are implemented usingcrossbeam_queue::ArrayQueue
- When the
- Workstealing: A
crossbeam_dequeue::Injectoris used to submit tasks and serves as a global queue. Worker threads each have their own local queue and can take tasks either from the global queue or steal from other workers' local queues if their own queue is empty. This is a good choice for workloads that are either highly variable from task to task (in terms of processing time), or are fork-join in nature (i.e., tasks that submit sub-tasks).
- Channel: uses a
- The
Hivecreates aWorkerinstance for each thread in the pool. - Each thread in the pool continually:
- Depending on which of
Hive's methods are called to submit a task (or batch of tasks), theOutcome(s) may be returned as anIterator, sent to an outputchannel, or stored in theHivefor later retrieval. - A
Hivemay createWorkers may in one of three ways: - A
Workers may be stateful, i.e.,Worker::apply()takes a&mut self - While
Queenis not stateful,QueenMutmay be (i.e., it'screate()method takes a&mut self) - Although it is strongly recommended to avoid
panics in worker threads (and thus, withinWorkerimplementations), theHivedoes automatically restart any threads that panic. - A
Hivemay besuspended andresumed at any time. When aHiveis suspended, worker threads do no work and tasks accumulate in the input queue. - Several utility functions are provided in the util module. Notably, the
mapandtry_mapfunctions enable simple parallel processing of a single batch of tasks. - Several useful
Workerimplementations are provided in the stock module. Most notable are those in thecallsubmodule, which provide different ways of wrappingcallables, i.e., closures and function pointers. - The following optional features are provided via feature flags:
affinity: worker threads may be pinned to CPU cores to minimize the overhead of context-switching.local-batch(>=0.3.0): worker threads take batches of tasks from the global input queue and add them to a local queue, which may alleviate thread contention, especially when there are many short-lived tasks.- Tasks may be
Weightedto enable balancing unevenly sized tasks between worker threads.
- Tasks may be
retry: Tasks that fail due to transient errors (e.g., temporarily unavailable resources) may be retried a set number of times, with an optional, exponentially increasing delay between retries.- Several alternative
channelimplementations are supported:
To parallelize a task, you'll need two things:
- A
Workerimplementation. Your options are:- Use an existing implementation from the stock module (see Example 2 below)
- Implement your own (See Example 3 below)
usethe necessary traits (e.g.,use beekeeper::bee::prelude::*)- Define a
structfor your worker - Implement the
Workertrait on your struct and define theapplymethod with the logic of your task - Do at least one of the following:
- Implement
Defaultfor your worker - Implement
Clonefor your worker - Create a custom worker fatory that implements the
QueenorQueenMuttrait
- Implement
- A
Hiveto execute your tasks. Your options are:- Use one of the convenience methods in the util module (see Example 1 below)
- Create a
Hivemanually using aBuilder(see Examples 2 and 3 below)OpenBuilderis the most general builderOpenBuilder::new()creates an emptyOpenBuilderBuilder::default()creates aOpenBuilderwith the global default settings (which may be changed using the functions in thehivemodule, e.g.,beekeeper::hive::set_num_threads_default(4)).- The builder must be specialized for the
QueenandTaskQueuestypes:- If you have a
Workerthat implementsDefault, usewith_worker_default::<MyWorker>() - If you have a
Workerthat implementsClone, usewith_worker(MyWorker::new()) - If you have a custom
Queen, usewith_queen_default::<MyQueen>()if it implementsDefault, otherwise usewith_queen(MyQueen::new()) - If you have a custom
QueenMut, usewith_queen_mut_default::<MyQueenMut>()if it implementsDefault, otherwise usewith_queen_mut(MyQueenMut::new()) - Use the
with_channel_queuesorwith_workstealing_queuesto configure theTaskQueuesimplementation
- If you have a
- Use the
build()methods to build theHive - Note that
Builder::num_threads()must be set to a non-zero value, otherwise the builtHivewill not start any worker threads until you call theHive::grow()method.
Once you've created a Hive, use its methods to submit tasks for processing. There are
four groups of methods available:
apply: submits a single taskswarm: submits a batch of tasks given a collection of inputs with known size (i.e., anything that implementsIntoIterator<IntoIter: ExactSizeIterator>)map: submits an arbitrary batch of tasks (i.e., anything that implementsIntoIterator)scan: Similar tomap, but you also provide 1) an initial value for a state variable, and- a function that transforms each item in the input iterator into the input type required by
the
Worker, and also has access to (and may modify) the state variable.
- a function that transforms each item in the input iterator into the input type required by
the
There are multiple methods in each group that differ by how the task results (called
Outcomes) are handled:
- The unsuffixed methods return an
Iteratorover theOutcomes in the same order as the inputs (or, in the case ofapply, a singleOutcome) - The methods with the
_unorderedsuffix instead return an unordered iterator, which may be more performant than the ordered iterator - The methods with the
_sendsuffix accept a channelSenderand send theOutcomes to that channel as they are completed (see this note). - The methods with the
_storesuffix store theOutcomes in theHive; these may be retrieved later using theHive::take_stored()method, using one of theremove*methods (which requiresOutcomeStoreto be in scope), or by using one of the methods onHuskafter shutting down theHiveusingHive::try_into_husk().
When using one of the _send methods, you should ensure that the Sender is dropped after
all tasks have been submitted, otherwise calling recv() on (or iterating over) the Receiver
will block indefinitely.
Within a Hive, each submitted task is assinged a unique ID. The _send and _store
methods return the task_ids of the submitted tasks, which can be used to retrieve them later
(e.g., using Hive::remove()).
After submitting tasks, you may use the Hive::join() method to wait
for all tasks to complete. Using join is strongly recommended when using one of the _store
methods, otherwise you'll need to continually poll the Hive to check for completed tasks.
When you are finished with a Hive, you may simply drop it (either explicitly, or by letting
it go out of scope) - the worker threads will be terminated automatically. If you used the
_store methods and would like to have access to the stored task Outcomes after the Hive
has been dropped, and/or you'd like to re-use the Hive's Queen or other configuration
parameters, you can use the Hive::try_into_husk() method to extract
the relevant data from the Hive into a Husk object.
pub fn double(i: usize) -> usize {
i * 2
}
// parallelize the computation of `double` on a range of numbers
// over 4 threads, and sum the results
const N: usize = 100;
let sum_doubles: usize = beekeeper::util::map(4, 0..N, double)
.into_iter()
.sum();
println!("Sum of {} doubles: {}", N, sum_doubles);use beekeeper::bee::stock::{Thunk, ThunkWorker};
use beekeeper::hive::prelude::*;
// create a hive to process `Thunk`s - no-argument closures with the
// same return type (`i32`)
let hive = OpenBuilder::new()
.num_threads(4)
.thread_name("thunk_hive")
.with_worker_default::<ThunkWorker<i32>>()
.with_channel_queues()
.build();
// return results to your own channel...
let (tx, rx) = outcome_channel();
let _ = hive.swarm_send(
(0..10).map(|i: i32| Thunk::from(move || i * i)),
tx
);
assert_eq!(285, rx.into_outputs().take(10).sum());
// return results as an iterator...
let total = hive
.swarm_unordered((0..10).map(|i: i32| Thunk::from(move || i * -i)))
.into_outputs()
.sum();
assert_eq!(-285, total);Suppose you'd like to parallelize executions of a line-delimited process, such as cat.
This requires defining a struct to hold the process stdin and stdout, and
implementing the Worker trait for this struct. We'll also use a custom Queen to keep track
of the Child processes and make sure they're terminated properly.
use beekeeper::bee::prelude::*;
use beekeeper::hive::prelude::*;
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::process::{Child, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
#[derive(Debug)]
struct CatWorker {
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl CatWorker {
fn new(stdin: ChildStdin, stdout: ChildStdout) -> Self {
Self {
stdin,
stdout: BufReader::new(stdout),
}
}
fn write_char(&mut self, c: u8) -> io::Result<String> {
self.stdin.write_all(&[c])?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut s = String::new();
self.stdout.read_line(&mut s)?;
s.pop(); // exclude newline
Ok(s)
}
}
impl Worker for CatWorker {
type Input = u8;
type Output = String;
type Error = io::Error;
fn apply(
&mut self,
input: Self::Input,
_: &Context<Self::Input>
) -> WorkerResult<Self> {
self.write_char(input).map_err(|error| {
ApplyError::Fatal { input: Some(input), error }
})
}
}
#[derive(Default)]
struct CatQueen {
children: Vec<Child>,
}
impl CatQueen {
fn wait_for_all(&mut self) -> Vec<io::Result<ExitStatus>> {
self.children
.drain(..)
.map(|mut child| child.wait())
.collect()
}
}
impl QueenMut for CatQueen {
type Kind = CatWorker;
fn create(&mut self) -> Self::Kind {
let mut child = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
self.children.push(child);
CatWorker::new(stdin, stdout)
}
}
impl Drop for CatQueen {
fn drop(&mut self) {
self.wait_for_all().into_iter().for_each(|result| {
match result {
Ok(status) if status.success() => (),
Ok(status) => {
eprintln!("Child process failed: {}", status);
}
Err(e) => {
eprintln!("Error waiting for child process: {}", e);
}
}
})
}
}
// build the Hive
let hive = OpenBuilder::new()
.num_threads(4)
.with_queen_mut_default::<CatQueen>()
.with_channel_queues()
.unwrap();
// prepare inputs
let inputs = (0..8).map(|i| 97 + i);
// execute tasks and collect outputs
let output = hive
.swarm(inputs)
.into_outputs()
.fold(String::new(), |mut a, b| {
a.push_str(&b);
a
})
.into_bytes();
// verify the output - note that `swarm` ensures the outputs are in
// the same order as the inputs
assert_eq!(output, b"abcdefgh");
// shutdown the hive, use the Queen to wait on child processes, and
// report errors
let (mut queen, _outcomes) = hive.try_into_husk().unwrap().into_parts();
let (wait_ok, wait_err): (Vec<_>, Vec<_>) = queen
.into_inner()
.wait_for_all()
.into_iter()
.partition(Result::is_ok);
if !wait_err.is_empty() {
panic!(
"Error(s) occurred while waiting for child processes: {:?}",
wait_err
);
}
let exec_err_codes: Vec<_> = wait_ok
.into_iter()
.map(Result::unwrap)
.filter_map(|status| (!status.success()).then(|| status.code()))
.flatten()
.collect();
if !exec_err_codes.is_empty() {
panic!(
"Child process(es) failed with exit codes: {:?}",
exec_err_codes
);
}Early versions of this crate (< 0.3) had some fatal design flaws that needed to be corrected with breaking changes (see the changelog).
As of version 0.3, the beekeeper API is generally considered to be stable, but additional real-world battle-testing is desired before promoting the version to 1.0.0. If you identify bugs or have suggestions for improvement, please open an issue.
You may choose either of the following licenses:
- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
Beekeeper began as a fork of workerpool.
