diff --git a/src/concurrency.md b/src/concurrency.md index 88c597f..a75e0a6 100644 --- a/src/concurrency.md +++ b/src/concurrency.md @@ -3,6 +3,7 @@ | Recipe | Crates | Categories | |--------|--------|------------| | [Spawn a short-lived thread][ex-crossbeam-spawn] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] | +| [Create a parallel data pipeline][ex-crossbeam-pipeline] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] | | [Pass data between two threads][ex-crossbeam-spsc] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] | | [Maintain global mutable state][ex-global-mut-state] | [![lazy_static-badge]][lazy_static] | [![cat-rust-patterns-badge]][cat-rust-patterns] | | [Calculate SHA1 sum of *.iso files concurrently][ex-threadpool-walk] | [![threadpool-badge]][threadpool] [![walkdir-badge]][walkdir] [![num_cpus-badge]][num_cpus] [![ring-badge]][ring] | [![cat-concurrency-badge]][cat-concurrency][![cat-filesystem-badge]][cat-filesystem] | @@ -16,6 +17,7 @@ [ex-crossbeam-spawn]: concurrency/threads.html#spawn-a-short-lived-thread +[ex-crossbeam-pipeline]: concurrency/threads.html#create-a-parallel-pipeline [ex-crossbeam-spsc]: concurrency/threads.html#pass-data-between-two-threads [ex-global-mut-state]: concurrency/threads.html#maintain-global-mutable-state [ex-threadpool-walk]: concurrency/threads.html#calculate-sha256-sum-of-iso-files-concurrently diff --git a/src/concurrency/thread/crossbeam-complex.md b/src/concurrency/thread/crossbeam-complex.md new file mode 100644 index 0000000..7842783 --- /dev/null +++ b/src/concurrency/thread/crossbeam-complex.md @@ -0,0 +1,85 @@ +## Create a parallel pipeline + +[![crossbeam-badge]][crossbeam] [![cat-concurrency-badge]][cat-concurrency] + +This example uses the [crossbeam] and [crossbeam-channel] crates to create +a parallel pipline, similar to that described in the ZeroMQ [guide] +There is a data source and a data sink, with data being processed by two worker +threads in parallel on its way from the source to the sink. + +We use bounded channels with a capacity of one using +[`crossbeam_channel::bounded`]. The producer must be on its own thread because +it produces messages faster than the workers can process them (since they sleep +for half a second) - this means the producer blocks on the call to +`[crossbeam_channel::Sender::send`] for half a second until one of the workers +processes the data in the channel. Also note that the data in the channel is +consumed by whichever worker calls receive first, so each message is delivered +to a single worker rather than both workers. + +Reading from the channels via the iterator +[`crossbeam_channel::Receiver::iter`] method will block, either waiting +for new messages or until the channel is closed. Because the channels were +created within the [`crossbeam::scope`], we must manually close them via `drop` +to prevent the entire program from blocking on the worker for-loops. You can +think of the calls to `drop` as signaling that no more messages will be sent. + + +```rust +extern crate crossbeam; +extern crate crossbeam_channel; + +use std::thread; +use std::time::Duration; +use crossbeam_channel::bounded; + +fn main() { + let (snd1, rcv1) = bounded(1); + let (snd2, rcv2) = bounded(1); + let n_msgs = 4; + let n_workers = 2; + + crossbeam::scope(|s| { + // Producer thread + s.spawn(|_| { + for i in 0..n_msgs { + snd1.send(i).unwrap(); + println!("Source sent {}", i); + } + // Close the channel - this is necessary to exit + // the for-loop in the worker + drop(snd1); + }); + + // Parallel processing by 2 threads + for _ in 0..n_workers { + // Send to sink, receive from source + let (sendr, recvr) = (snd2.clone(), rcv1.clone()); + // Spawn workers in separate threads + s.spawn(move |_| { + thread::sleep(Duration::from_millis(500)); + // Receive until channel closes + for msg in recvr.iter() { + println!("Worker {:?} received {}.", + thread::current().id(), msg); + sendr.send(msg * 2).unwrap(); + } + }); + } + // Close the channel, otherwise sink will never + // exit the for-loop + drop(snd2); + + // Sink + for msg in rcv2.iter() { + println!("Sink received {}", msg); + } + }).unwrap(); +} +``` + +[`crossbeam::scope`]: https://docs.rs/crossbeam/*/crossbeam/fn.scope.html +[crossbeam-channel]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/index.html +[`crossbeam_channel::bounded`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/fn.bounded.html +[`crossbeam_channel::Receiver::iter`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/struct.Receiver.html#method.iter +[`crossbeam_channel::Sender::send`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/struct.Sender.html#method.send +[guide]: http://zguide.zeromq.org/page:all#Divide-and-Conquer diff --git a/src/concurrency/threads.md b/src/concurrency/threads.md index aac649f..ab31fb3 100644 --- a/src/concurrency/threads.md +++ b/src/concurrency/threads.md @@ -2,6 +2,8 @@ {{#include thread/crossbeam-spawn.md}} +{{#include thread/crossbeam-complex.md}} + {{#include thread/crossbeam-spsc.md}} {{#include thread/global-mut-state.md}}