mirror of
https://github.com/rust-lang-nursery/rust-cookbook
synced 2024-11-24 20:43:07 +00:00
Adds crossbeam channel example of a parallel data pipeline. (#554)
Co-authored-by: Andrew Gauger <andygauge@gmail.com>
This commit is contained in:
parent
a00ff787ce
commit
3c32c84475
3 changed files with 89 additions and 0 deletions
|
@ -3,6 +3,7 @@
|
|||
| Recipe | Crates | Categories |
|
||||
|--------|--------|------------|
|
||||
| [Spawn a short-lived thread][ex-crossbeam-spawn] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] |
|
||||
| [Create a parallel data pipeline][ex-crossbeam-pipeline] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] |
|
||||
| [Pass data between two threads][ex-crossbeam-spsc] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] |
|
||||
| [Maintain global mutable state][ex-global-mut-state] | [![lazy_static-badge]][lazy_static] | [![cat-rust-patterns-badge]][cat-rust-patterns] |
|
||||
| [Calculate SHA1 sum of *.iso files concurrently][ex-threadpool-walk] | [![threadpool-badge]][threadpool] [![walkdir-badge]][walkdir] [![num_cpus-badge]][num_cpus] [![ring-badge]][ring] | [![cat-concurrency-badge]][cat-concurrency][![cat-filesystem-badge]][cat-filesystem] |
|
||||
|
@ -16,6 +17,7 @@
|
|||
|
||||
|
||||
[ex-crossbeam-spawn]: concurrency/threads.html#spawn-a-short-lived-thread
|
||||
[ex-crossbeam-pipeline]: concurrency/threads.html#create-a-parallel-pipeline
|
||||
[ex-crossbeam-spsc]: concurrency/threads.html#pass-data-between-two-threads
|
||||
[ex-global-mut-state]: concurrency/threads.html#maintain-global-mutable-state
|
||||
[ex-threadpool-walk]: concurrency/threads.html#calculate-sha256-sum-of-iso-files-concurrently
|
||||
|
|
85
src/concurrency/thread/crossbeam-complex.md
Normal file
85
src/concurrency/thread/crossbeam-complex.md
Normal file
|
@ -0,0 +1,85 @@
|
|||
## Create a parallel pipeline
|
||||
|
||||
[![crossbeam-badge]][crossbeam] [![cat-concurrency-badge]][cat-concurrency]
|
||||
|
||||
This example uses the [crossbeam] and [crossbeam-channel] crates to create
|
||||
a parallel pipline, similar to that described in the ZeroMQ [guide]
|
||||
There is a data source and a data sink, with data being processed by two worker
|
||||
threads in parallel on its way from the source to the sink.
|
||||
|
||||
We use bounded channels with a capacity of one using
|
||||
[`crossbeam_channel::bounded`]. The producer must be on its own thread because
|
||||
it produces messages faster than the workers can process them (since they sleep
|
||||
for half a second) - this means the producer blocks on the call to
|
||||
`[crossbeam_channel::Sender::send`] for half a second until one of the workers
|
||||
processes the data in the channel. Also note that the data in the channel is
|
||||
consumed by whichever worker calls receive first, so each message is delivered
|
||||
to a single worker rather than both workers.
|
||||
|
||||
Reading from the channels via the iterator
|
||||
[`crossbeam_channel::Receiver::iter`] method will block, either waiting
|
||||
for new messages or until the channel is closed. Because the channels were
|
||||
created within the [`crossbeam::scope`], we must manually close them via `drop`
|
||||
to prevent the entire program from blocking on the worker for-loops. You can
|
||||
think of the calls to `drop` as signaling that no more messages will be sent.
|
||||
|
||||
|
||||
```rust
|
||||
extern crate crossbeam;
|
||||
extern crate crossbeam_channel;
|
||||
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use crossbeam_channel::bounded;
|
||||
|
||||
fn main() {
|
||||
let (snd1, rcv1) = bounded(1);
|
||||
let (snd2, rcv2) = bounded(1);
|
||||
let n_msgs = 4;
|
||||
let n_workers = 2;
|
||||
|
||||
crossbeam::scope(|s| {
|
||||
// Producer thread
|
||||
s.spawn(|_| {
|
||||
for i in 0..n_msgs {
|
||||
snd1.send(i).unwrap();
|
||||
println!("Source sent {}", i);
|
||||
}
|
||||
// Close the channel - this is necessary to exit
|
||||
// the for-loop in the worker
|
||||
drop(snd1);
|
||||
});
|
||||
|
||||
// Parallel processing by 2 threads
|
||||
for _ in 0..n_workers {
|
||||
// Send to sink, receive from source
|
||||
let (sendr, recvr) = (snd2.clone(), rcv1.clone());
|
||||
// Spawn workers in separate threads
|
||||
s.spawn(move |_| {
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
// Receive until channel closes
|
||||
for msg in recvr.iter() {
|
||||
println!("Worker {:?} received {}.",
|
||||
thread::current().id(), msg);
|
||||
sendr.send(msg * 2).unwrap();
|
||||
}
|
||||
});
|
||||
}
|
||||
// Close the channel, otherwise sink will never
|
||||
// exit the for-loop
|
||||
drop(snd2);
|
||||
|
||||
// Sink
|
||||
for msg in rcv2.iter() {
|
||||
println!("Sink received {}", msg);
|
||||
}
|
||||
}).unwrap();
|
||||
}
|
||||
```
|
||||
|
||||
[`crossbeam::scope`]: https://docs.rs/crossbeam/*/crossbeam/fn.scope.html
|
||||
[crossbeam-channel]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/index.html
|
||||
[`crossbeam_channel::bounded`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/fn.bounded.html
|
||||
[`crossbeam_channel::Receiver::iter`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/struct.Receiver.html#method.iter
|
||||
[`crossbeam_channel::Sender::send`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/struct.Sender.html#method.send
|
||||
[guide]: http://zguide.zeromq.org/page:all#Divide-and-Conquer
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
{{#include thread/crossbeam-spawn.md}}
|
||||
|
||||
{{#include thread/crossbeam-complex.md}}
|
||||
|
||||
{{#include thread/crossbeam-spsc.md}}
|
||||
|
||||
{{#include thread/global-mut-state.md}}
|
||||
|
|
Loading…
Reference in a new issue