diff --git a/crates/nu-cli/tests/completions/mod.rs b/crates/nu-cli/tests/completions/mod.rs index 2c0a98688c..001a6c0599 100644 --- a/crates/nu-cli/tests/completions/mod.rs +++ b/crates/nu-cli/tests/completions/mod.rs @@ -942,7 +942,7 @@ fn flag_completions() { // Test completions for the 'ls' flags let suggestions = completer.complete("ls -", 4); - assert_eq!(16, suggestions.len()); + assert_eq!(18, suggestions.len()); let expected: Vec = vec![ "--all".into(), @@ -953,6 +953,7 @@ fn flag_completions() { "--long".into(), "--mime-type".into(), "--short-names".into(), + "--threads".into(), "-D".into(), "-a".into(), "-d".into(), @@ -961,6 +962,7 @@ fn flag_completions() { "-l".into(), "-m".into(), "-s".into(), + "-t".into(), ]; // Match results diff --git a/crates/nu-command/src/filesystem/ls.rs b/crates/nu-command/src/filesystem/ls.rs index d198816727..465fae16ff 100644 --- a/crates/nu-command/src/filesystem/ls.rs +++ b/crates/nu-command/src/filesystem/ls.rs @@ -8,11 +8,14 @@ use nu_glob::MatchOptions; use nu_path::{expand_path_with, expand_to_real_path}; use nu_protocol::{DataSource, NuGlob, PipelineMetadata, Signals}; use pathdiff::diff_paths; +use rayon::prelude::*; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; use std::{ path::PathBuf, + sync::mpsc, + sync::{Arc, Mutex}, time::{SystemTime, UNIX_EPOCH}, }; @@ -28,6 +31,7 @@ struct Args { du: bool, directory: bool, use_mime_type: bool, + use_threads: bool, call_span: Span, } @@ -75,6 +79,7 @@ impl Command for Ls { Some('D'), ) .switch("mime-type", "Show mime-type in type column instead of 'file' (based on filenames only; files' contents are not examined)", Some('m')) + .switch("threads", "Use multiple threads to list contents. Output will be non-deterministic.", Some('t')) .category(Category::FileSystem) } @@ -92,6 +97,7 @@ impl Command for Ls { let du = call.has_flag(engine_state, stack, "du")?; let directory = call.has_flag(engine_state, stack, "directory")?; let use_mime_type = call.has_flag(engine_state, stack, "mime-type")?; + let use_threads = call.has_flag(engine_state, stack, "threads")?; let call_span = call.head; #[allow(deprecated)] let cwd = current_dir(engine_state, stack)?; @@ -104,6 +110,7 @@ impl Command for Ls { du, directory, use_mime_type, + use_threads, call_span, }; @@ -114,22 +121,24 @@ impl Command for Ls { Some(pattern_arg) }; match input_pattern_arg { - None => Ok(ls_for_one_pattern(None, args, engine_state.signals(), cwd)? - .into_pipeline_data_with_metadata( - call_span, - engine_state.signals().clone(), - PipelineMetadata { - data_source: DataSource::Ls, - content_type: None, - }, - )), + None => Ok( + ls_for_one_pattern(None, args, engine_state.signals().clone(), cwd)? + .into_pipeline_data_with_metadata( + call_span, + engine_state.signals().clone(), + PipelineMetadata { + data_source: DataSource::Ls, + content_type: None, + }, + ), + ), Some(pattern) => { let mut result_iters = vec![]; for pat in pattern { result_iters.push(ls_for_one_pattern( Some(pat), args, - engine_state.signals(), + engine_state.signals().clone(), cwd.clone(), )?) } @@ -213,9 +222,27 @@ impl Command for Ls { fn ls_for_one_pattern( pattern_arg: Option>, args: Args, - signals: &Signals, + signals: Signals, cwd: PathBuf, -) -> Result + Send>, ShellError> { +) -> Result { + fn create_pool(num_threads: usize) -> Result { + match rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + { + Err(e) => Err(e).map_err(|e| ShellError::GenericError { + error: "Error creating thread pool".into(), + msg: e.to_string(), + span: Some(Span::unknown()), + help: None, + inner: vec![], + }), + Ok(pool) => Ok(pool), + } + } + + let (tx, rx) = mpsc::channel(); + let Args { all, long, @@ -224,6 +251,7 @@ fn ls_for_one_pattern( du, directory, use_mime_type, + use_threads, call_span, } = args; let pattern_arg = { @@ -281,7 +309,7 @@ fn ls_for_one_pattern( }); } if is_empty_dir(&tmp_expanded) { - return Ok(Box::new(vec![].into_iter())); + return Ok(Value::test_nothing().into_pipeline_data()); } just_read_dir = !(pat.item.is_expand() && pat.item.as_ref().contains(GLOB_CHARS)); } @@ -300,7 +328,7 @@ fn ls_for_one_pattern( if directory { (NuGlob::Expand(".".to_string()), false) } else if is_empty_dir(&cwd) { - return Ok(Box::new(vec![].into_iter())); + return Ok(Value::test_nothing().into_pipeline_data()); } else { (NuGlob::Expand("*".to_string()), false) } @@ -338,92 +366,130 @@ fn ls_for_one_pattern( }); } - let mut hidden_dirs = vec![]; + let hidden_dirs = Arc::new(Mutex::new(Vec::new())); - let signals = signals.clone(); - Ok(Box::new(paths_peek.filter_map(move |x| match x { - Ok(path) => { - let metadata = match std::fs::symlink_metadata(&path) { - Ok(metadata) => Some(metadata), - Err(_) => None, - }; - if path_contains_hidden_folder(&path, &hidden_dirs) { - return None; - } + let signals_clone = signals.clone(); - if !all && !hidden_dir_specified && is_hidden_dir(&path) { - if path.is_dir() { - hidden_dirs.push(path); - } - return None; - } + let pool = if use_threads { + let count = std::thread::available_parallelism()?.get(); + create_pool(count)? + } else { + create_pool(1)? + }; - let display_name = if short_names { - path.file_name().map(|os| os.to_string_lossy().to_string()) - } else if full_paths || absolute_path { - Some(path.to_string_lossy().to_string()) - } else if let Some(prefix) = &prefix { - if let Ok(remainder) = path.strip_prefix(prefix) { - if directory { - // When the path is the same as the cwd, path_diff should be "." - let path_diff = if let Some(path_diff_not_dot) = diff_paths(&path, &cwd) { - let path_diff_not_dot = path_diff_not_dot.to_string_lossy(); - if path_diff_not_dot.is_empty() { - ".".to_string() + pool.install(|| { + paths_peek + .par_bridge() + .filter_map(move |x| match x { + Ok(path) => { + let metadata = match std::fs::symlink_metadata(&path) { + Ok(metadata) => Some(metadata), + Err(_) => None, + }; + let hidden_dir_clone = Arc::clone(&hidden_dirs); + let mut hidden_dir_mutex = hidden_dir_clone + .lock() + .expect("Unable to acquire lock for hidden_dirs"); + if path_contains_hidden_folder(&path, &hidden_dir_mutex) { + return None; + } + + if !all && !hidden_dir_specified && is_hidden_dir(&path) { + if path.is_dir() { + hidden_dir_mutex.push(path); + drop(hidden_dir_mutex); + } + return None; + } + + let display_name = if short_names { + path.file_name().map(|os| os.to_string_lossy().to_string()) + } else if full_paths || absolute_path { + Some(path.to_string_lossy().to_string()) + } else if let Some(prefix) = &prefix { + if let Ok(remainder) = path.strip_prefix(prefix) { + if directory { + // When the path is the same as the cwd, path_diff should be "." + let path_diff = + if let Some(path_diff_not_dot) = diff_paths(&path, &cwd) { + let path_diff_not_dot = path_diff_not_dot.to_string_lossy(); + if path_diff_not_dot.is_empty() { + ".".to_string() + } else { + path_diff_not_dot.to_string() + } + } else { + path.to_string_lossy().to_string() + }; + + Some(path_diff) } else { - path_diff_not_dot.to_string() + let new_prefix = if let Some(pfx) = diff_paths(prefix, &cwd) { + pfx + } else { + prefix.to_path_buf() + }; + + Some(new_prefix.join(remainder).to_string_lossy().to_string()) } } else { - path.to_string_lossy().to_string() - }; - - Some(path_diff) + Some(path.to_string_lossy().to_string()) + } } else { - let new_prefix = if let Some(pfx) = diff_paths(prefix, &cwd) { - pfx - } else { - prefix.to_path_buf() - }; - - Some(new_prefix.join(remainder).to_string_lossy().to_string()) + Some(path.to_string_lossy().to_string()) } - } else { - Some(path.to_string_lossy().to_string()) - } - } else { - Some(path.to_string_lossy().to_string()) - } - .ok_or_else(|| ShellError::GenericError { - error: format!("Invalid file name: {:}", path.to_string_lossy()), - msg: "invalid file name".into(), - span: Some(call_span), - help: None, - inner: vec![], - }); + .ok_or_else(|| ShellError::GenericError { + error: format!("Invalid file name: {:}", path.to_string_lossy()), + msg: "invalid file name".into(), + span: Some(call_span), + help: None, + inner: vec![], + }); - match display_name { - Ok(name) => { - let entry = dir_entry_dict( - &path, - &name, - metadata.as_ref(), - call_span, - long, - du, - &signals, - use_mime_type, - args.full_paths, - ); - match entry { - Ok(value) => Some(value), + match display_name { + Ok(name) => { + let entry = dir_entry_dict( + &path, + &name, + metadata.as_ref(), + call_span, + long, + du, + &signals_clone, + use_mime_type, + args.full_paths, + ); + match entry { + Ok(value) => Some(value), + Err(err) => Some(Value::error(err, call_span)), + } + } Err(err) => Some(Value::error(err, call_span)), } } Err(err) => Some(Value::error(err, call_span)), - } - } - Err(err) => Some(Value::error(err, call_span)), - }))) + }) + .try_for_each(|stream| { + tx.send(stream).map_err(|e| ShellError::GenericError { + error: "Error streaming data".into(), + msg: e.to_string(), + span: Some(call_span), + help: None, + inner: vec![], + }) + }) + }) + .map_err(|err| ShellError::GenericError { + error: "Unable to create a rayon pool".into(), + msg: err.to_string(), + span: Some(call_span), + help: None, + inner: vec![], + })?; + + Ok(rx + .into_iter() + .into_pipeline_data(call_span, signals.clone())) } fn permission_denied(dir: impl AsRef) -> bool {