remove watcher ctx

This commit is contained in:
Aleksey Kladov 2019-01-26 17:17:28 +03:00
parent bf98fc609e
commit 9f16892b94

View file

@ -13,8 +13,6 @@ use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watc
use crate::{RootConfig, Roots, VfsRoot}; use crate::{RootConfig, Roots, VfsRoot};
type Result<T> = std::result::Result<T, crossbeam_channel::SendError<TaskResult>>;
pub(crate) enum Task { pub(crate) enum Task {
AddRoot { AddRoot {
root: VfsRoot, root: VfsRoot,
@ -62,7 +60,6 @@ impl Worker {
pub(crate) fn start(roots: Arc<Roots>) -> Worker { pub(crate) fn start(roots: Arc<Roots>) -> Worker {
// This is a pretty elaborate setup of threads & channels! It is // This is a pretty elaborate setup of threads & channels! It is
// explained by the following concerns: // explained by the following concerns:
// * we need to burn a thread translating from notify's mpsc to // * we need to burn a thread translating from notify's mpsc to
// crossbeam_channel. // crossbeam_channel.
// * we want to read all files from a single thread, to gurantee that // * we want to read all files from a single thread, to gurantee that
@ -79,48 +76,57 @@ impl Worker {
let (notify_sender, notify_receiver) = mpsc::channel(); let (notify_sender, notify_receiver) = mpsc::channel();
// These are the corresponding crossbeam channels // These are the corresponding crossbeam channels
let (watcher_sender, watcher_receiver) = unbounded(); let (watcher_sender, watcher_receiver) = unbounded();
let watcher = notify::watcher(notify_sender, WATCHER_DELAY)
let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
.map_err(|e| log::error!("failed to spawn notify {}", e)) .map_err(|e| log::error!("failed to spawn notify {}", e))
.ok(); .ok();
let mut ctx = WatcherCtx { // Start a silly thread to tranform between two channels
roots,
watcher,
sender: output_sender,
};
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let _ = notify_receiver notify_receiver
.into_iter() .into_iter()
// forward relevant events only .for_each(|event| convert_notify_event(event, &watcher_sender))
.for_each(|event| convert_notify_event(event, &watcher_sender));
}); });
// Process requests from the called or notifications from
// watcher until the caller says stop.
loop { loop {
select! { select! {
// Received request from the caller. If this channel is // Received request from the caller. If this channel is
// closed, we should shutdown everything. // closed, we should shutdown everything.
recv(input_receiver) -> t => match t { recv(input_receiver) -> t => match t {
Err(RecvError) => break, Err(RecvError) => {
Ok(Task::AddRoot { root, config }) => watch_root(&mut ctx, root, Arc::clone(&config)), drop(input_receiver);
break
},
Ok(Task::AddRoot { root, config }) => {
watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
}
}, },
// Watcher send us changes. If **this** channel is // Watcher send us changes. If **this** channel is
// closed, the watcher has died, which indicates a bug // closed, the watcher has died, which indicates a bug
// -- escalate! // -- escalate!
recv(watcher_receiver) -> event => match event { recv(watcher_receiver) -> event => match event {
Err(RecvError) => panic!("watcher is dead"), Err(RecvError) => panic!("watcher is dead"),
Ok((path, change)) => WatcherCtx::handle_change(&mut ctx, path, change).unwrap(), Ok((path, change)) => {
handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
}
}, },
} }
} }
drop(ctx.watcher.take()); // Stopped the watcher
drop(ctx); drop(watcher.take());
let res2 = thread.join(); // Drain pending events: we are not inrerested in them anyways!
match &res2 { watcher_receiver.into_iter().for_each(|_| ());
let res = thread.join();
match &res {
Ok(()) => log::info!("... Watcher terminated with ok"), Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"), Err(_) => log::error!("... Watcher terminated with err"),
} }
res2.unwrap(); res.unwrap();
}, },
); );
Worker { Worker {
worker, worker,
worker_handle, worker_handle,
@ -141,9 +147,14 @@ impl Worker {
} }
} }
fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) { fn watch_root(
watcher: Option<&mut RecommendedWatcher>,
sender: &Sender<TaskResult>,
root: VfsRoot,
config: Arc<RootConfig>,
) {
log::debug!("loading {} ...", config.root.as_path().display()); log::debug!("loading {} ...", config.root.as_path().display());
let files = watch_recursive(woker.watcher.as_mut(), config.root.as_path(), &*config) let files = watch_recursive(watcher, config.root.as_path(), &*config)
.into_iter() .into_iter()
.filter_map(|path| { .filter_map(|path| {
let abs_path = path.to_path(&config.root); let abs_path = path.to_path(&config.root);
@ -151,20 +162,14 @@ fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) {
Some((path, text)) Some((path, text))
}) })
.collect(); .collect();
woker sender
.sender
.send(TaskResult::BulkLoadRoot { root, files }) .send(TaskResult::BulkLoadRoot { root, files })
.unwrap(); .unwrap();
log::debug!("... loaded {}", config.root.as_path().display()); log::debug!("... loaded {}", config.root.as_path().display());
} }
struct WatcherCtx {
roots: Arc<Roots>,
watcher: Option<RecommendedWatcher>,
sender: Sender<TaskResult>,
}
fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
// forward relevant events only
match event { match event {
DebouncedEvent::NoticeWrite(_) DebouncedEvent::NoticeWrite(_)
| DebouncedEvent::NoticeRemove(_) | DebouncedEvent::NoticeRemove(_)
@ -194,48 +199,55 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK
} }
} }
impl WatcherCtx { fn handle_change(
fn handle_change(&mut self, path: PathBuf, kind: ChangeKind) -> Result<()> { watcher: Option<&mut RecommendedWatcher>,
let (root, rel_path) = match self.roots.find(&path) { sender: &Sender<TaskResult>,
None => return Ok(()), roots: &Roots,
Some(it) => it, path: PathBuf,
}; kind: ChangeKind,
let config = &self.roots[root]; ) {
match kind { let (root, rel_path) = match roots.find(&path) {
ChangeKind::Create => { None => return,
let mut paths = Vec::new(); Some(it) => it,
if path.is_dir() { };
paths.extend(watch_recursive(self.watcher.as_mut(), &path, &config)); let config = &roots[root];
} else { match kind {
paths.push(rel_path); ChangeKind::Create => {
} let mut paths = Vec::new();
paths if path.is_dir() {
.into_iter() paths.extend(watch_recursive(watcher, &path, &config));
.filter_map(|rel_path| { } else {
let abs_path = rel_path.to_path(&config.root); paths.push(rel_path);
let text = read_to_string(&abs_path)?;
Some((rel_path, text))
})
.try_for_each(|(path, text)| {
self.sender
.send(TaskResult::AddSingleFile { root, path, text })
})?
} }
ChangeKind::Write => { paths
if let Some(text) = read_to_string(&path) { .into_iter()
self.sender.send(TaskResult::ChangeSingleFile { .filter_map(|rel_path| {
let abs_path = rel_path.to_path(&config.root);
let text = read_to_string(&abs_path)?;
Some((rel_path, text))
})
.try_for_each(|(path, text)| {
sender.send(TaskResult::AddSingleFile { root, path, text })
})
.unwrap()
}
ChangeKind::Write => {
if let Some(text) = read_to_string(&path) {
sender
.send(TaskResult::ChangeSingleFile {
root, root,
path: rel_path, path: rel_path,
text, text,
})?; })
} .unwrap();
} }
ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { }
ChangeKind::Remove => sender
.send(TaskResult::RemoveSingleFile {
root, root,
path: rel_path, path: rel_path,
})?, })
} .unwrap(),
Ok(())
} }
} }