671: Makre VFS slightly less super obscure r=vemoo a=matklad

I've decided to better understand what we do in VFS, and this turns out to be really hard. Jugling threads and channels is one of the most unfortunately arcane bits of rust...

I had some success though by flattenning the structure so that all channel & thread creation routines are on one screen. 

r? @vemoo 

Co-authored-by: Aleksey Kladov <aleksey.kladov@gmail.com>
This commit is contained in:
bors[bot] 2019-01-26 15:13:44 +00:00
commit a8d32c4d1a
3 changed files with 295 additions and 350 deletions

View file

@ -1,19 +1,22 @@
use std::{fs, sync::Arc, thread};
use crossbeam_channel::{Receiver, Sender};
use std::{
fs,
thread,
path::{Path, PathBuf},
sync::{mpsc, Arc},
time::Duration,
};
use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
use relative_path::RelativePathBuf;
use thread_worker::WorkerHandle;
use walkdir::WalkDir;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
mod watcher;
use watcher::Watcher;
use crate::{RootFilter, Roots, VfsRoot};
use crate::{RootConfig, Roots, VfsRoot};
pub(crate) enum Task {
AddRoot {
root: VfsRoot,
filter: Arc<RootFilter>,
config: Arc<RootConfig>,
},
}
@ -39,6 +42,15 @@ pub enum TaskResult {
},
}
#[derive(Debug)]
enum ChangeKind {
Create,
Write,
Remove,
}
const WATCHER_DELAY: Duration = Duration::from_millis(250);
pub(crate) struct Worker {
worker: thread_worker::Worker<Task, TaskResult>,
worker_handle: WorkerHandle,
@ -46,24 +58,75 @@ pub(crate) struct Worker {
impl Worker {
pub(crate) fn start(roots: Arc<Roots>) -> Worker {
let (worker, worker_handle) =
thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
let mut watcher = match Watcher::start(roots, output_sender.clone()) {
Ok(w) => Some(w),
Err(e) => {
log::error!("could not start watcher: {}", e);
None
}
};
let res = input_receiver
// This is a pretty elaborate setup of threads & channels! It is
// explained by the following concerns:
// * we need to burn a thread translating from notify's mpsc to
// crossbeam_channel.
// * we want to read all files from a single thread, to gurantee that
// we always get fresher versions and never go back in time.
// * we want to tear down everything neatly during shutdown.
let (worker, worker_handle) = thread_worker::spawn(
"vfs",
128,
// This are the channels we use to communicate with outside world.
// If `input_receiver` is closed we need to tear ourselves down.
// `output_sender` should not be closed unless the parent died.
move |input_receiver, output_sender| {
// These are `std` channels notify will send events to
let (notify_sender, notify_receiver) = mpsc::channel();
// These are the corresponding crossbeam channels
let (watcher_sender, watcher_receiver) = unbounded();
let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
.map_err(|e| log::error!("failed to spawn notify {}", e))
.ok();
// Start a silly thread to tranform between two channels
let thread = thread::spawn(move || {
notify_receiver
.into_iter()
.filter_map(|t| handle_task(t, &mut watcher))
.try_for_each(|it| output_sender.send(it));
if let Some(watcher) = watcher {
let _ = watcher.shutdown();
}
res.unwrap()
.for_each(|event| convert_notify_event(event, &watcher_sender))
});
// Process requests from the called or notifications from
// watcher until the caller says stop.
loop {
select! {
// Received request from the caller. If this channel is
// closed, we should shutdown everything.
recv(input_receiver) -> t => match t {
Err(RecvError) => {
drop(input_receiver);
break
},
Ok(Task::AddRoot { root, config }) => {
watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
}
},
// Watcher send us changes. If **this** channel is
// closed, the watcher has died, which indicates a bug
// -- escalate!
recv(watcher_receiver) -> event => match event {
Err(RecvError) => panic!("watcher is dead"),
Ok((path, change)) => {
handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
}
},
}
}
// Stopped the watcher
drop(watcher.take());
// Drain pending events: we are not inrerested in them anyways!
watcher_receiver.into_iter().for_each(|_| ());
let res = thread.join();
match &res {
Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"),
}
res.unwrap();
},
);
Worker {
worker,
worker_handle,
@ -84,46 +147,142 @@ impl Worker {
}
}
fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> {
match task {
Task::AddRoot { root, filter } => {
if let Some(watcher) = watcher {
watcher.watch_root(&filter)
fn watch_root(
watcher: Option<&mut RecommendedWatcher>,
sender: &Sender<TaskResult>,
root: VfsRoot,
config: Arc<RootConfig>,
) {
log::debug!("loading {} ...", config.root.as_path().display());
let files = watch_recursive(watcher, config.root.as_path(), &*config)
.into_iter()
.filter_map(|path| {
let abs_path = path.to_path(&config.root);
let text = read_to_string(&abs_path)?;
Some((path, text))
})
.collect();
sender
.send(TaskResult::BulkLoadRoot { root, files })
.unwrap();
log::debug!("... loaded {}", config.root.as_path().display());
}
fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
// forward relevant events only
match event {
DebouncedEvent::NoticeWrite(_)
| DebouncedEvent::NoticeRemove(_)
| DebouncedEvent::Chmod(_) => {
// ignore
}
log::debug!("loading {} ...", filter.root.as_path().display());
let files = load_root(filter.as_ref());
log::debug!("... loaded {}", filter.root.as_path().display());
Some(TaskResult::BulkLoadRoot { root, files })
DebouncedEvent::Rescan => {
// TODO rescan all roots
}
DebouncedEvent::Create(path) => {
sender.send((path, ChangeKind::Create)).unwrap();
}
DebouncedEvent::Write(path) => {
sender.send((path, ChangeKind::Write)).unwrap();
}
DebouncedEvent::Remove(path) => {
sender.send((path, ChangeKind::Remove)).unwrap();
}
DebouncedEvent::Rename(src, dst) => {
sender.send((src, ChangeKind::Remove)).unwrap();
sender.send((dst, ChangeKind::Create)).unwrap();
}
DebouncedEvent::Error(err, path) => {
// TODO should we reload the file contents?
log::warn!("watcher error \"{}\", {:?}", err, path);
}
}
}
fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> {
let mut res = Vec::new();
for entry in WalkDir::new(&filter.root)
fn handle_change(
watcher: Option<&mut RecommendedWatcher>,
sender: &Sender<TaskResult>,
roots: &Roots,
path: PathBuf,
kind: ChangeKind,
) {
let (root, rel_path) = match roots.find(&path) {
None => return,
Some(it) => it,
};
let config = &roots[root];
match kind {
ChangeKind::Create => {
let mut paths = Vec::new();
if path.is_dir() {
paths.extend(watch_recursive(watcher, &path, &config));
} else {
paths.push(rel_path);
}
paths
.into_iter()
.filter_entry(filter.entry_filter())
{
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
log::warn!("watcher error: {}", e);
continue;
.filter_map(|rel_path| {
let abs_path = rel_path.to_path(&config.root);
let text = read_to_string(&abs_path)?;
Some((rel_path, text))
})
.try_for_each(|(path, text)| {
sender.send(TaskResult::AddSingleFile { root, path, text })
})
.unwrap()
}
};
if !entry.file_type().is_file() {
continue;
ChangeKind::Write => {
if let Some(text) = read_to_string(&path) {
sender
.send(TaskResult::ChangeSingleFile {
root,
path: rel_path,
text,
})
.unwrap();
}
let path = entry.path();
let text = match fs::read_to_string(path) {
Ok(text) => text,
Err(e) => {
log::warn!("watcher error: {}", e);
continue;
}
};
let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap();
res.push((path.to_owned(), text))
ChangeKind::Remove => sender
.send(TaskResult::RemoveSingleFile {
root,
path: rel_path,
})
.unwrap(),
}
res
}
fn watch_recursive(
mut watcher: Option<&mut RecommendedWatcher>,
dir: &Path,
config: &RootConfig,
) -> Vec<RelativePathBuf> {
let mut files = Vec::new();
for entry in WalkDir::new(dir)
.into_iter()
.filter_entry(|it| config.contains(it.path()).is_some())
.filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok())
{
if entry.file_type().is_dir() {
if let Some(watcher) = &mut watcher {
watch_one(watcher, entry.path());
}
} else {
let path = config.contains(entry.path()).unwrap();
files.push(path.to_owned());
}
}
files
}
fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
match watcher.watch(dir, RecursiveMode::NonRecursive) {
Ok(()) => log::debug!("watching \"{}\"", dir.display()),
Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
}
}
fn read_to_string(path: &Path) -> Option<String> {
fs::read_to_string(&path)
.map_err(|e| log::warn!("failed to read file {}", e))
.ok()
}

View file

@ -1,200 +0,0 @@
use crate::{io, RootFilter, Roots, VfsRoot};
use crossbeam_channel::Sender;
use drop_bomb::DropBomb;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use parking_lot::Mutex;
use std::{
fs,
path::{Path, PathBuf},
sync::{mpsc, Arc},
thread,
time::Duration,
};
use walkdir::WalkDir;
#[derive(Debug)]
enum ChangeKind {
Create,
Write,
Remove,
}
const WATCHER_DELAY: Duration = Duration::from_millis(250);
pub(crate) struct Watcher {
thread: thread::JoinHandle<()>,
bomb: DropBomb,
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
}
impl Watcher {
pub(crate) fn start(
roots: Arc<Roots>,
output_sender: Sender<io::TaskResult>,
) -> Result<Watcher, Box<std::error::Error>> {
let (input_sender, input_receiver) = mpsc::channel();
let watcher = Arc::new(Mutex::new(Some(notify::watcher(
input_sender,
WATCHER_DELAY,
)?)));
let sender = output_sender.clone();
let watcher_clone = watcher.clone();
let thread = thread::spawn(move || {
let worker = WatcherWorker {
roots,
watcher: watcher_clone,
sender,
};
input_receiver
.into_iter()
// forward relevant events only
.try_for_each(|change| worker.handle_debounced_event(change))
.unwrap()
});
Ok(Watcher {
thread,
watcher,
bomb: DropBomb::new(format!("Watcher was not shutdown")),
})
}
pub fn watch_root(&mut self, filter: &RootFilter) {
for res in WalkDir::new(&filter.root)
.into_iter()
.filter_entry(filter.entry_filter())
{
match res {
Ok(entry) => {
if entry.file_type().is_dir() {
watch_one(self.watcher.as_ref(), entry.path());
}
}
Err(e) => log::warn!("watcher error: {}", e),
}
}
}
pub fn shutdown(mut self) -> thread::Result<()> {
self.bomb.defuse();
drop(self.watcher.lock().take());
let res = self.thread.join();
match &res {
Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"),
}
res
}
}
struct WatcherWorker {
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
roots: Arc<Roots>,
sender: Sender<io::TaskResult>,
}
impl WatcherWorker {
fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box<std::error::Error>> {
match ev {
DebouncedEvent::NoticeWrite(_)
| DebouncedEvent::NoticeRemove(_)
| DebouncedEvent::Chmod(_) => {
// ignore
}
DebouncedEvent::Rescan => {
// TODO rescan all roots
}
DebouncedEvent::Create(path) => {
self.handle_change(path, ChangeKind::Create);
}
DebouncedEvent::Write(path) => {
self.handle_change(path, ChangeKind::Write);
}
DebouncedEvent::Remove(path) => {
self.handle_change(path, ChangeKind::Remove);
}
DebouncedEvent::Rename(src, dst) => {
self.handle_change(src, ChangeKind::Remove);
self.handle_change(dst, ChangeKind::Create);
}
DebouncedEvent::Error(err, path) => {
// TODO should we reload the file contents?
log::warn!("watcher error \"{}\", {:?}", err, path);
}
}
Ok(())
}
fn handle_change(&self, path: PathBuf, kind: ChangeKind) {
if let Err(e) = self.try_handle_change(path, kind) {
log::warn!("watcher error: {}", e)
}
}
fn try_handle_change(
&self,
path: PathBuf,
kind: ChangeKind,
) -> Result<(), Box<std::error::Error>> {
let (root, rel_path) = match self.roots.find(&path) {
Some(x) => x,
None => return Ok(()),
};
match kind {
ChangeKind::Create => {
if path.is_dir() {
self.watch_recursive(&path, root);
} else {
let text = fs::read_to_string(&path)?;
self.sender.send(io::TaskResult::AddSingleFile {
root,
path: rel_path,
text,
})?
}
}
ChangeKind::Write => {
let text = fs::read_to_string(&path)?;
self.sender.send(io::TaskResult::ChangeSingleFile {
root,
path: rel_path,
text,
})?
}
ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile {
root,
path: rel_path,
})?,
}
Ok(())
}
fn watch_recursive(&self, dir: &Path, root: VfsRoot) {
let filter = &self.roots[root];
for res in WalkDir::new(dir)
.into_iter()
.filter_entry(filter.entry_filter())
{
match res {
Ok(entry) => {
if entry.file_type().is_dir() {
watch_one(self.watcher.as_ref(), entry.path());
} else {
// emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching
// emit as create because we haven't seen it yet
self.handle_change(entry.path().to_path_buf(), ChangeKind::Create);
}
}
Err(e) => log::warn!("watcher error: {}", e),
}
}
}
}
fn watch_one(watcher: &Mutex<Option<RecommendedWatcher>>, dir: &Path) {
if let Some(watcher) = watcher.lock().as_mut() {
match watcher.watch(dir, RecursiveMode::NonRecursive) {
Ok(()) => log::debug!("watching \"{}\"", dir.display()),
Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
}
}
}

View file

@ -18,94 +18,78 @@ mod io;
use std::{
cmp::Reverse,
fmt, fs, mem,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::Arc,
thread,
};
use crossbeam_channel::Receiver;
use ra_arena::{impl_arena_id, Arena, RawId};
use ra_arena::{impl_arena_id, Arena, RawId, map::ArenaMap};
use relative_path::{Component, RelativePath, RelativePathBuf};
use rustc_hash::{FxHashMap, FxHashSet};
use walkdir::DirEntry;
pub use crate::io::TaskResult as VfsTask;
use io::{TaskResult, Worker};
/// `RootFilter` is a predicate that checks if a file can belong to a root. If
/// several filters match a file (nested dirs), the most nested one wins.
pub(crate) struct RootFilter {
root: PathBuf,
filter: fn(&Path, &RelativePath) -> bool,
excluded_dirs: Vec<PathBuf>,
}
impl RootFilter {
fn new(root: PathBuf, excluded_dirs: Vec<PathBuf>) -> RootFilter {
RootFilter {
root,
filter: default_filter,
excluded_dirs,
}
}
/// Check if this root can contain `path`. NB: even if this returns
/// true, the `path` might actually be conained in some nested root.
pub(crate) fn can_contain(&self, path: &Path) -> Option<RelativePathBuf> {
let rel_path = path.strip_prefix(&self.root).ok()?;
let rel_path = RelativePathBuf::from_path(rel_path).ok()?;
if !(self.filter)(path, rel_path.as_relative_path()) {
return None;
}
Some(rel_path)
}
pub(crate) fn entry_filter<'a>(&'a self) -> impl FnMut(&DirEntry) -> bool + 'a {
move |entry: &DirEntry| {
if entry.file_type().is_dir() && self.excluded_dirs.iter().any(|it| it == entry.path())
{
// do not walk nested roots
false
} else {
self.can_contain(entry.path()).is_some()
}
}
}
}
pub(crate) fn default_filter(path: &Path, rel_path: &RelativePath) -> bool {
if path.is_dir() {
for (i, c) in rel_path.components().enumerate() {
if let Component::Normal(c) = c {
// TODO hardcoded for now
if (i == 0 && c == "target") || c == ".git" || c == "node_modules" {
return false;
}
}
}
true
} else {
rel_path.extension() == Some("rs")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct VfsRoot(pub RawId);
impl_arena_id!(VfsRoot);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct VfsFile(pub RawId);
impl_arena_id!(VfsFile);
struct VfsFileData {
root: VfsRoot,
path: RelativePathBuf,
is_overlayed: bool,
text: Arc<String>,
/// Describes the contents of a single source root.
///
/// `RootConfig` can be thought of as a glob pattern like `src/**.rs` whihc
/// specifes the source root or as a function whihc takes a `PathBuf` and
/// returns `true` iff path belongs to the source root
pub(crate) struct RootConfig {
root: PathBuf,
excluded_dirs: Vec<PathBuf>,
}
pub(crate) struct Roots {
roots: Arena<VfsRoot, Arc<RootFilter>>,
roots: Arena<VfsRoot, Arc<RootConfig>>,
}
impl std::ops::Deref for Roots {
type Target = Arena<VfsRoot, Arc<RootConfig>>;
fn deref(&self) -> &Self::Target {
&self.roots
}
}
impl RootConfig {
fn new(root: PathBuf, excluded_dirs: Vec<PathBuf>) -> RootConfig {
RootConfig {
root,
excluded_dirs,
}
}
/// Cheks if root contains a path and returns a root-relative path.
pub(crate) fn contains(&self, path: &Path) -> Option<RelativePathBuf> {
// First, check excluded dirs
if self.excluded_dirs.iter().any(|it| path.starts_with(it)) {
return None;
}
let rel_path = path.strip_prefix(&self.root).ok()?;
let rel_path = RelativePathBuf::from_path(rel_path).ok()?;
// Ignore some common directories.
//
// FIXME: don't hard-code, specify at source-root creation time using
// gitignore
for (i, c) in rel_path.components().enumerate() {
if let Component::Normal(c) = c {
if (i == 0 && c == "target") || c == ".git" || c == "node_modules" {
return None;
}
}
}
if path.is_file() && rel_path.extension() != Some("rs") {
return None;
}
Some(rel_path)
}
}
impl Roots {
@ -120,59 +104,61 @@ impl Roots {
.map(|it| it.clone())
.collect::<Vec<_>>();
let root_filter = Arc::new(RootFilter::new(path.clone(), nested_roots));
let config = Arc::new(RootConfig::new(path.clone(), nested_roots));
roots.alloc(root_filter.clone());
roots.alloc(config.clone());
}
Roots { roots }
}
pub(crate) fn find(&self, path: &Path) -> Option<(VfsRoot, RelativePathBuf)> {
self.roots
.iter()
.find_map(|(root, data)| data.can_contain(path).map(|it| (root, it)))
.find_map(|(root, data)| data.contains(path).map(|it| (root, it)))
}
}
impl Deref for Roots {
type Target = Arena<VfsRoot, Arc<RootFilter>>;
fn deref(&self) -> &Self::Target {
&self.roots
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct VfsFile(pub RawId);
impl_arena_id!(VfsFile);
impl DerefMut for Roots {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.roots
}
struct VfsFileData {
root: VfsRoot,
path: RelativePathBuf,
is_overlayed: bool,
text: Arc<String>,
}
pub struct Vfs {
roots: Arc<Roots>,
files: Arena<VfsFile, VfsFileData>,
root2files: FxHashMap<VfsRoot, FxHashSet<VfsFile>>,
root2files: ArenaMap<VfsRoot, FxHashSet<VfsFile>>,
pending_changes: Vec<VfsChange>,
worker: Worker,
}
impl fmt::Debug for Vfs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Vfs { ... }")
f.debug_struct("Vfs")
.field("n_roots", &self.roots.len())
.field("n_files", &self.files.len())
.field("n_pending_changes", &self.pending_changes.len())
.finish()
}
}
impl Vfs {
pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
let roots = Arc::new(Roots::new(roots));
let worker = io::Worker::start(roots.clone());
let mut root2files = FxHashMap::default();
let worker = io::Worker::start(Arc::clone(&roots));
let mut root2files = ArenaMap::default();
for (root, filter) in roots.iter() {
for (root, config) in roots.iter() {
root2files.insert(root, Default::default());
worker
.sender()
.send(io::Task::AddRoot {
root,
filter: filter.clone(),
config: Arc::clone(config),
})
.unwrap();
}
@ -242,7 +228,7 @@ impl Vfs {
let mut cur_files = Vec::new();
// While we were scanning the root in the backgound, a file might have
// been open in the editor, so we need to account for that.
let exising = self.root2files[&root]
let exising = self.root2files[root]
.iter()
.map(|&file| (self.files[file].path.clone(), file))
.collect::<FxHashMap<_, _>>();
@ -384,7 +370,7 @@ impl Vfs {
is_overlayed,
};
let file = self.files.alloc(data);
self.root2files.get_mut(&root).unwrap().insert(file);
self.root2files.get_mut(root).unwrap().insert(file);
file
}
@ -399,7 +385,7 @@ impl Vfs {
self.files[file].text = Default::default();
self.files[file].path = Default::default();
let root = self.files[file].root;
let removed = self.root2files.get_mut(&root).unwrap().remove(&file);
let removed = self.root2files.get_mut(root).unwrap().remove(&file);
assert!(removed);
}
@ -410,7 +396,7 @@ impl Vfs {
}
fn find_file(&self, root: VfsRoot, path: &RelativePath) -> Option<VfsFile> {
self.root2files[&root]
self.root2files[root]
.iter()
.map(|&it| it)
.find(|&file| self.files[file].path == path)