2018-12-18 09:45:20 +00:00
|
|
|
//! Small utility to correctly spawn crossbeam-channel based worker threads.
|
|
|
|
|
2018-10-20 19:38:47 +00:00
|
|
|
use std::thread;
|
|
|
|
|
2018-12-30 20:23:31 +00:00
|
|
|
use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError};
|
2018-10-15 21:44:23 +00:00
|
|
|
use drop_bomb::DropBomb;
|
2018-09-02 11:46:15 +00:00
|
|
|
|
2018-09-08 10:15:01 +00:00
|
|
|
pub struct Worker<I, O> {
|
|
|
|
pub inp: Sender<I>,
|
|
|
|
pub out: Receiver<O>,
|
|
|
|
}
|
|
|
|
|
2018-12-18 09:45:20 +00:00
|
|
|
pub struct WorkerHandle {
|
|
|
|
name: &'static str,
|
|
|
|
thread: thread::JoinHandle<()>,
|
|
|
|
bomb: DropBomb,
|
|
|
|
}
|
2018-09-08 10:15:01 +00:00
|
|
|
|
2018-12-18 09:45:20 +00:00
|
|
|
pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle)
|
|
|
|
where
|
|
|
|
F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
|
|
|
|
I: Send + 'static,
|
|
|
|
O: Send + 'static,
|
|
|
|
{
|
|
|
|
let (worker, inp_r, out_s) = worker_chan(buf);
|
|
|
|
let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s));
|
|
|
|
(worker, watcher)
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<I, O> Worker<I, O> {
|
|
|
|
/// Stops the worker. Returns the message receiver to fetch results which
|
|
|
|
/// have become ready before the worker is stopped.
|
2018-12-18 10:18:55 +00:00
|
|
|
pub fn shutdown(self) -> Receiver<O> {
|
2018-09-08 10:15:01 +00:00
|
|
|
self.out
|
|
|
|
}
|
|
|
|
|
2018-12-30 20:23:31 +00:00
|
|
|
pub fn send(&self, item: I) -> Result<(), SendError<I>> {
|
2018-09-08 10:15:01 +00:00
|
|
|
self.inp.send(item)
|
|
|
|
}
|
2018-12-30 20:23:31 +00:00
|
|
|
pub fn recv(&self) -> Result<O, RecvError> {
|
2018-12-19 12:04:15 +00:00
|
|
|
self.out.recv()
|
|
|
|
}
|
2018-09-08 10:15:01 +00:00
|
|
|
}
|
|
|
|
|
2018-12-18 09:45:20 +00:00
|
|
|
impl WorkerHandle {
|
|
|
|
fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle {
|
2018-09-02 11:46:15 +00:00
|
|
|
let thread = thread::spawn(f);
|
2018-12-18 09:45:20 +00:00
|
|
|
WorkerHandle {
|
2018-09-02 11:46:15 +00:00
|
|
|
name,
|
|
|
|
thread,
|
2018-12-18 10:18:55 +00:00
|
|
|
bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)),
|
2018-09-02 11:46:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-12-18 10:18:55 +00:00
|
|
|
pub fn shutdown(mut self) -> thread::Result<()> {
|
2018-12-06 18:03:39 +00:00
|
|
|
log::info!("waiting for {} to finish ...", self.name);
|
2018-09-02 11:46:15 +00:00
|
|
|
let name = self.name;
|
|
|
|
self.bomb.defuse();
|
2018-12-18 09:45:20 +00:00
|
|
|
let res = self.thread.join();
|
2018-09-02 11:46:15 +00:00
|
|
|
match &res {
|
2018-12-06 18:03:39 +00:00
|
|
|
Ok(()) => log::info!("... {} terminated with ok", name),
|
|
|
|
Err(_) => log::error!("... {} terminated with err", name),
|
2018-09-02 11:46:15 +00:00
|
|
|
}
|
|
|
|
res
|
|
|
|
}
|
|
|
|
}
|
2018-09-08 09:36:02 +00:00
|
|
|
|
2019-02-11 16:18:27 +00:00
|
|
|
/// Sets up worker channels in a deadlock-avoiding way.
|
2018-09-08 09:36:02 +00:00
|
|
|
/// If one sets both input and output buffers to a fixed size,
|
|
|
|
/// a worker might get stuck.
|
2018-10-16 18:08:52 +00:00
|
|
|
fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) {
|
2018-09-08 09:36:02 +00:00
|
|
|
let (input_sender, input_receiver) = bounded::<I>(buf);
|
|
|
|
let (output_sender, output_receiver) = unbounded::<O>();
|
2019-02-08 11:49:43 +00:00
|
|
|
(Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender)
|
2018-09-08 09:36:02 +00:00
|
|
|
}
|