mirror of
https://github.com/nushell/nushell
synced 2024-12-26 13:03:07 +00:00
Benchmark each pipeline element (#7854)
# Description Adds a `profile` command that profiles each pipeline element of a block and can also recursively step into child blocks. # Limitations * It is implemented using pipeline metadata which currently get lost in some circumstances (e.g., https://github.com/nushell/nushell/issues/4501). This means that the profiler will lose data coming from subexpressions. This issue will hopefully be solved in the future. * It also does not step into individual loop iteration which I'm not sure why but maybe that's a good thing. # User-Facing Changes Shouldn't change any existing behavior. # Tests + Formatting Don't forget to add tests that cover your changes. Make sure you've run and fixed any issues with these commands: - `cargo fmt --all -- --check` to check standard code formatting (`cargo fmt --all` applies these changes) - `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used -A clippy::needless_collect` to check that you're using the standard code style - `cargo test --workspace` to check that all tests pass # After Submitting If your PR had any user-facing changes, update [the documentation](https://github.com/nushell/nushell.github.io) after the PR is merged, if necessary. This will help us keep the docs up to date. --------- Co-authored-by: Darren Schroeder <343840+fdncred@users.noreply.github.com>
This commit is contained in:
parent
64b6c02a22
commit
58529aa0b2
12 changed files with 328 additions and 25 deletions
|
@ -75,7 +75,7 @@ impl Command for Metadata {
|
|||
None => {
|
||||
let mut cols = vec![];
|
||||
let mut vals = vec![];
|
||||
if let Some(x) = &input.metadata() {
|
||||
if let Some(x) = input.metadata().as_deref() {
|
||||
match x {
|
||||
PipelineMetadata {
|
||||
data_source: DataSource::Ls,
|
||||
|
@ -89,6 +89,12 @@ impl Command for Metadata {
|
|||
cols.push("source".into());
|
||||
vals.push(Value::string("into html --list", head))
|
||||
}
|
||||
PipelineMetadata {
|
||||
data_source: DataSource::Profiling(values),
|
||||
} => {
|
||||
cols.push("profiling".into());
|
||||
vals.push(Value::list(values.clone(), head))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -118,7 +124,11 @@ impl Command for Metadata {
|
|||
}
|
||||
}
|
||||
|
||||
fn build_metadata_record(arg: &Value, metadata: &Option<PipelineMetadata>, head: Span) -> Value {
|
||||
fn build_metadata_record(
|
||||
arg: &Value,
|
||||
metadata: &Option<Box<PipelineMetadata>>,
|
||||
head: Span,
|
||||
) -> Value {
|
||||
let mut cols = vec![];
|
||||
let mut vals = vec![];
|
||||
|
||||
|
@ -140,7 +150,7 @@ fn build_metadata_record(arg: &Value, metadata: &Option<PipelineMetadata>, head:
|
|||
});
|
||||
}
|
||||
|
||||
if let Some(x) = &metadata {
|
||||
if let Some(x) = metadata.as_deref() {
|
||||
match x {
|
||||
PipelineMetadata {
|
||||
data_source: DataSource::Ls,
|
||||
|
@ -154,6 +164,12 @@ fn build_metadata_record(arg: &Value, metadata: &Option<PipelineMetadata>, head:
|
|||
cols.push("source".into());
|
||||
vals.push(Value::string("into html --list", head))
|
||||
}
|
||||
PipelineMetadata {
|
||||
data_source: DataSource::Profiling(values),
|
||||
} => {
|
||||
cols.push("profiling".into());
|
||||
vals.push(Value::list(values.clone(), head))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -475,6 +475,7 @@ pub fn create_default_context() -> EngineState {
|
|||
// Experimental
|
||||
bind_command! {
|
||||
IsAdmin,
|
||||
Profile,
|
||||
View,
|
||||
ViewFiles,
|
||||
ViewSource,
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
mod is_admin;
|
||||
mod profile;
|
||||
mod view;
|
||||
mod view_files;
|
||||
mod view_source;
|
||||
mod view_span;
|
||||
|
||||
pub use is_admin::IsAdmin;
|
||||
pub use profile::Profile;
|
||||
pub use view::View;
|
||||
pub use view_files::ViewFiles;
|
||||
pub use view_source::ViewSource;
|
||||
|
|
113
crates/nu-command/src/experimental/profile.rs
Normal file
113
crates/nu-command/src/experimental/profile.rs
Normal file
|
@ -0,0 +1,113 @@
|
|||
use nu_engine::{eval_block, CallExt};
|
||||
use nu_protocol::ast::Call;
|
||||
use nu_protocol::engine::{Closure, Command, EngineState, ProfilingConfig, Stack};
|
||||
use nu_protocol::{
|
||||
Category, DataSource, Example, IntoPipelineData, PipelineData, PipelineMetadata, Signature,
|
||||
Spanned, SyntaxShape, Type, Value,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Profile;
|
||||
|
||||
impl Command for Profile {
|
||||
fn name(&self) -> &str {
|
||||
"profile"
|
||||
}
|
||||
|
||||
fn usage(&self) -> &str {
|
||||
"Profile each pipeline element in a closure."
|
||||
}
|
||||
|
||||
fn extra_usage(&self) -> &str {
|
||||
r#"The command collects run time of every pipeline element, recursively stepping into child closures
|
||||
until a maximum depth. Optionally, it also collects the source code and intermediate values.
|
||||
|
||||
Current known limitations are:
|
||||
* profiling data from subexpressions is not tracked
|
||||
* it does not step into loop iterations"#
|
||||
}
|
||||
|
||||
fn signature(&self) -> nu_protocol::Signature {
|
||||
Signature::build("profile")
|
||||
.required(
|
||||
"closure",
|
||||
SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
|
||||
"the closure to run",
|
||||
)
|
||||
.switch("source", "Collect source code in the report", None)
|
||||
.switch("values", "Collect values in the report", None)
|
||||
.named(
|
||||
"max-depth",
|
||||
SyntaxShape::Int,
|
||||
"How many levels of blocks to step into (default: 1)",
|
||||
Some('d'),
|
||||
)
|
||||
.input_output_types(vec![(Type::Any, Type::Table(vec![]))])
|
||||
.allow_variants_without_examples(true)
|
||||
.category(Category::Debug)
|
||||
}
|
||||
|
||||
fn run(
|
||||
&self,
|
||||
engine_state: &EngineState,
|
||||
stack: &mut Stack,
|
||||
call: &Call,
|
||||
input: PipelineData,
|
||||
) -> Result<nu_protocol::PipelineData, nu_protocol::ShellError> {
|
||||
let capture_block: Spanned<Closure> = call.req(engine_state, stack, 0)?;
|
||||
let block = engine_state.get_block(capture_block.item.block_id);
|
||||
|
||||
let redirect_stdout = call.redirect_stdout;
|
||||
let redirect_stderr = call.redirect_stderr;
|
||||
|
||||
let mut stack = stack.captures_to_stack(&capture_block.item.captures);
|
||||
|
||||
let input_val = input.into_value(call.head);
|
||||
|
||||
if let Some(var) = block.signature.get_positional(0) {
|
||||
if let Some(var_id) = &var.var_id {
|
||||
stack.add_var(*var_id, input_val.clone());
|
||||
}
|
||||
}
|
||||
|
||||
stack.profiling_config = ProfilingConfig::new(
|
||||
call.get_flag::<i64>(engine_state, &mut stack, "max-depth")?
|
||||
.unwrap_or(1),
|
||||
call.has_flag("source"),
|
||||
call.has_flag("values"),
|
||||
);
|
||||
|
||||
let profiling_metadata = Box::new(PipelineMetadata {
|
||||
data_source: DataSource::Profiling(vec![]),
|
||||
});
|
||||
|
||||
let result = if let Some(PipelineMetadata {
|
||||
data_source: DataSource::Profiling(values),
|
||||
}) = eval_block(
|
||||
engine_state,
|
||||
&mut stack,
|
||||
block,
|
||||
input_val.into_pipeline_data_with_metadata(profiling_metadata),
|
||||
redirect_stdout,
|
||||
redirect_stderr,
|
||||
)?
|
||||
.metadata()
|
||||
.map(|m| *m)
|
||||
{
|
||||
Value::list(values, call.head)
|
||||
} else {
|
||||
Value::nothing(call.head)
|
||||
};
|
||||
|
||||
Ok(result.into_pipeline_data())
|
||||
}
|
||||
|
||||
fn examples(&self) -> Vec<Example> {
|
||||
vec![Example {
|
||||
description:
|
||||
"Profile some code, stepping into the `spam` command and collecting source.",
|
||||
example: r#"def spam [] { "spam" }; profile { spam | str length } -d 2 --source"#,
|
||||
result: None,
|
||||
}]
|
||||
}
|
||||
}
|
|
@ -264,9 +264,9 @@ impl Command for Ls {
|
|||
_ => Some(Value::Nothing { span: call_span }),
|
||||
})
|
||||
.into_pipeline_data_with_metadata(
|
||||
PipelineMetadata {
|
||||
Box::new(PipelineMetadata {
|
||||
data_source: DataSource::Ls,
|
||||
},
|
||||
}),
|
||||
engine_state.ctrlc.clone(),
|
||||
))
|
||||
}
|
||||
|
|
|
@ -264,7 +264,7 @@ pub fn uniq(
|
|||
call: &Call,
|
||||
input: Vec<Value>,
|
||||
item_mapper: Box<dyn Fn(ItemMapperState) -> ValueCounter>,
|
||||
metadata: Option<PipelineMetadata>,
|
||||
metadata: Option<Box<PipelineMetadata>>,
|
||||
) -> Result<PipelineData, ShellError> {
|
||||
let ctrlc = engine_state.ctrlc.clone();
|
||||
let head = call.head;
|
||||
|
|
|
@ -320,9 +320,9 @@ fn to_html(
|
|||
vals: result,
|
||||
span: head,
|
||||
}
|
||||
.into_pipeline_data_with_metadata(PipelineMetadata {
|
||||
.into_pipeline_data_with_metadata(Box::new(PipelineMetadata {
|
||||
data_source: DataSource::HtmlThemes,
|
||||
}));
|
||||
})));
|
||||
} else {
|
||||
let theme_span = match &theme {
|
||||
Some(v) => v.span,
|
||||
|
|
|
@ -612,9 +612,9 @@ fn handle_row_stream(
|
|||
call: &Call,
|
||||
row_offset: usize,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
metadata: Option<PipelineMetadata>,
|
||||
metadata: Option<Box<PipelineMetadata>>,
|
||||
) -> Result<PipelineData, ShellError> {
|
||||
let stream = match metadata {
|
||||
let stream = match metadata.as_deref() {
|
||||
// First, `ls` sources:
|
||||
Some(PipelineMetadata {
|
||||
data_source: DataSource::Ls,
|
||||
|
|
|
@ -6,11 +6,12 @@ use nu_protocol::{
|
|||
Operator, PathMember, PipelineElement, Redirection,
|
||||
},
|
||||
engine::{EngineState, Stack},
|
||||
Config, IntoInterruptiblePipelineData, IntoPipelineData, PipelineData, Range, ShellError, Span,
|
||||
Spanned, Unit, Value, VarId, ENV_VARIABLE_ID,
|
||||
Config, DataSource, IntoInterruptiblePipelineData, IntoPipelineData, PipelineData,
|
||||
PipelineMetadata, Range, ShellError, Span, Spanned, Unit, Value, VarId, ENV_VARIABLE_ID,
|
||||
};
|
||||
use nu_utils::stdout_write_all_and_flush;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Instant;
|
||||
|
||||
pub fn eval_operator(op: &Expression) -> Result<Operator, ShellError> {
|
||||
match op {
|
||||
|
@ -989,7 +990,16 @@ pub fn eval_block(
|
|||
*stack.recursion_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let num_pipelines = block.len();
|
||||
|
||||
let mut input_metadata = if stack.profiling_config.should_debug() {
|
||||
stack.profiling_config.enter_block();
|
||||
input.metadata()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
for (pipeline_idx, pipeline) in block.pipelines.iter().enumerate() {
|
||||
let mut i = 0;
|
||||
|
||||
|
@ -1003,6 +1013,12 @@ pub fn eval_block(
|
|||
| PipelineElement::SeparateRedirection { .. }
|
||||
)));
|
||||
|
||||
let start_time = if stack.profiling_config.should_debug() {
|
||||
Some(Instant::now())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// if eval internal command failed, it can just make early return with `Err(ShellError)`.
|
||||
let eval_result = eval_element_with_input(
|
||||
engine_state,
|
||||
|
@ -1021,6 +1037,106 @@ pub fn eval_block(
|
|||
redirect_stderr,
|
||||
);
|
||||
|
||||
let end_time = if stack.profiling_config.should_debug() {
|
||||
Some(Instant::now())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let (Some(start_time), Some(end_time), Some(input_metadata)) =
|
||||
(start_time, end_time, input_metadata.as_deref_mut())
|
||||
{
|
||||
let span = pipeline.elements[i].span();
|
||||
let element_str = Value::string(
|
||||
String::from_utf8_lossy(
|
||||
engine_state.get_span_contents(&pipeline.elements[i].span()),
|
||||
),
|
||||
span,
|
||||
);
|
||||
let time_ns = (end_time - start_time).as_nanos() as i64;
|
||||
|
||||
let mut cols = vec![
|
||||
"pipeline_idx".to_string(),
|
||||
"element_idx".to_string(),
|
||||
"depth".to_string(),
|
||||
"span".to_string(),
|
||||
];
|
||||
|
||||
let mut vals = vec![
|
||||
Value::int(pipeline_idx as i64, span),
|
||||
Value::int(i as i64, span),
|
||||
Value::int(stack.profiling_config.depth, span),
|
||||
Value::record(
|
||||
vec!["start".to_string(), "end".to_string()],
|
||||
vec![
|
||||
Value::int(span.start as i64, span),
|
||||
Value::int(span.end as i64, span),
|
||||
],
|
||||
span,
|
||||
),
|
||||
];
|
||||
|
||||
if stack.profiling_config.collect_source {
|
||||
cols.push("source".to_string());
|
||||
vals.push(element_str.clone());
|
||||
}
|
||||
|
||||
if stack.profiling_config.collect_values {
|
||||
let value = match &eval_result {
|
||||
Ok((PipelineData::Value(val, ..), ..)) => val.clone(),
|
||||
Ok((PipelineData::ListStream(..), ..)) => {
|
||||
Value::string("list stream", span)
|
||||
}
|
||||
Ok((PipelineData::ExternalStream { .. }, ..)) => {
|
||||
Value::string("raw stream", span)
|
||||
}
|
||||
Ok((PipelineData::Empty, ..)) => Value::Nothing { span },
|
||||
Err(err) => Value::Error { error: err.clone() },
|
||||
};
|
||||
|
||||
cols.push("value".to_string());
|
||||
vals.push(value);
|
||||
}
|
||||
|
||||
cols.push("time".to_string());
|
||||
vals.push(Value::Duration { val: time_ns, span });
|
||||
|
||||
let record = Value::Record { cols, vals, span };
|
||||
|
||||
let element_metadata = if let Ok((pipeline_data, ..)) = &eval_result {
|
||||
pipeline_data.metadata()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let PipelineMetadata {
|
||||
data_source: DataSource::Profiling(tgt_vals),
|
||||
} = input_metadata
|
||||
{
|
||||
tgt_vals.push(record);
|
||||
} else {
|
||||
*input_metadata = PipelineMetadata {
|
||||
data_source: DataSource::Profiling(vec![record]),
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(PipelineMetadata {
|
||||
data_source: DataSource::Profiling(element_vals),
|
||||
}) = element_metadata.map(|m| *m)
|
||||
{
|
||||
if let PipelineMetadata {
|
||||
data_source: DataSource::Profiling(tgt_vals),
|
||||
} = input_metadata
|
||||
{
|
||||
tgt_vals.extend(element_vals);
|
||||
} else {
|
||||
*input_metadata = PipelineMetadata {
|
||||
data_source: DataSource::Profiling(element_vals),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match (eval_result, redirect_stderr) {
|
||||
(Ok((pipeline_data, _)), true) => {
|
||||
input = pipeline_data;
|
||||
|
@ -1037,6 +1153,9 @@ pub fn eval_block(
|
|||
// make early return so remaining commands will not be executed.
|
||||
// don't return `Err(ShellError)`, so nushell wouldn't show extra error message.
|
||||
if output.1 {
|
||||
if stack.profiling_config.should_debug() {
|
||||
stack.profiling_config.leave_block();
|
||||
}
|
||||
return Ok(input);
|
||||
}
|
||||
}
|
||||
|
@ -1120,7 +1239,12 @@ pub fn eval_block(
|
|||
}
|
||||
}
|
||||
|
||||
Ok(input)
|
||||
if stack.profiling_config.should_debug() {
|
||||
stack.profiling_config.leave_block();
|
||||
Ok(input.set_metadata(input_metadata))
|
||||
} else {
|
||||
Ok(input)
|
||||
}
|
||||
}
|
||||
|
||||
fn print_or_return(pipeline_data: PipelineData, config: &Config) -> Result<(), ShellError> {
|
||||
|
|
|
@ -17,7 +17,7 @@ pub fn collect_pipeline(input: PipelineData) -> (Vec<String>, Vec<Vec<Value>>) {
|
|||
metadata,
|
||||
span,
|
||||
..
|
||||
} => collect_external_stream(stdout, stderr, exit_code, metadata, span),
|
||||
} => collect_external_stream(stdout, stderr, exit_code, metadata.map(|m| *m), span),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,44 @@ use crate::{ShellError, Span, Value, VarId};
|
|||
/// Environment variables per overlay
|
||||
pub type EnvVars = HashMap<String, HashMap<String, Value>>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProfilingConfig {
|
||||
pub max_depth: i64,
|
||||
pub depth: i64,
|
||||
pub collect_source: bool,
|
||||
pub collect_values: bool,
|
||||
}
|
||||
|
||||
impl ProfilingConfig {
|
||||
pub fn new(max_depth: i64, collect_source: bool, collect_values: bool) -> Self {
|
||||
ProfilingConfig {
|
||||
max_depth,
|
||||
depth: 0,
|
||||
collect_source,
|
||||
collect_values,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enter_block(&mut self) {
|
||||
self.depth += 1;
|
||||
}
|
||||
|
||||
pub fn leave_block(&mut self) {
|
||||
self.depth -= 1;
|
||||
}
|
||||
|
||||
pub fn should_debug(&self) -> bool {
|
||||
self.depth <= self.max_depth
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.max_depth = 0;
|
||||
self.depth = 0;
|
||||
self.collect_source = false;
|
||||
self.collect_values = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// A runtime value stack used during evaluation
|
||||
///
|
||||
/// A note on implementation:
|
||||
|
@ -35,6 +73,7 @@ pub struct Stack {
|
|||
/// List of active overlays
|
||||
pub active_overlays: Vec<String>,
|
||||
pub recursion_count: Box<u64>,
|
||||
pub profiling_config: ProfilingConfig,
|
||||
}
|
||||
|
||||
impl Stack {
|
||||
|
@ -45,6 +84,7 @@ impl Stack {
|
|||
env_hidden: HashMap::new(),
|
||||
active_overlays: vec![DEFAULT_OVERLAY_NAME.to_string()],
|
||||
recursion_count: Box::new(0),
|
||||
profiling_config: ProfilingConfig::new(0, false, false),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,6 +166,7 @@ impl Stack {
|
|||
env_hidden: HashMap::new(),
|
||||
active_overlays: self.active_overlays.clone(),
|
||||
recursion_count: self.recursion_count.to_owned(),
|
||||
profiling_config: self.profiling_config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,6 +192,7 @@ impl Stack {
|
|||
env_hidden: HashMap::new(),
|
||||
active_overlays: self.active_overlays.clone(),
|
||||
recursion_count: self.recursion_count.to_owned(),
|
||||
profiling_config: self.profiling_config.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,14 +40,16 @@ const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
|
|||
/// Nushell.
|
||||
#[derive(Debug)]
|
||||
pub enum PipelineData {
|
||||
Value(Value, Option<PipelineMetadata>),
|
||||
ListStream(ListStream, Option<PipelineMetadata>),
|
||||
// Note: the PipelineMetadata is boxed everywhere because the DataSource::Profiling caused
|
||||
// stack overflow on Windows CI when testing virtualenv
|
||||
Value(Value, Option<Box<PipelineMetadata>>),
|
||||
ListStream(ListStream, Option<Box<PipelineMetadata>>),
|
||||
ExternalStream {
|
||||
stdout: Option<RawStream>,
|
||||
stderr: Option<RawStream>,
|
||||
exit_code: Option<ListStream>,
|
||||
span: Span,
|
||||
metadata: Option<PipelineMetadata>,
|
||||
metadata: Option<Box<PipelineMetadata>>,
|
||||
trim_end_newline: bool,
|
||||
},
|
||||
Empty,
|
||||
|
@ -62,10 +64,11 @@ pub struct PipelineMetadata {
|
|||
pub enum DataSource {
|
||||
Ls,
|
||||
HtmlThemes,
|
||||
Profiling(Vec<Value>),
|
||||
}
|
||||
|
||||
impl PipelineData {
|
||||
pub fn new_with_metadata(metadata: Option<PipelineMetadata>, span: Span) -> PipelineData {
|
||||
pub fn new_with_metadata(metadata: Option<Box<PipelineMetadata>>, span: Span) -> PipelineData {
|
||||
PipelineData::Value(Value::Nothing { span }, metadata)
|
||||
}
|
||||
|
||||
|
@ -73,7 +76,7 @@ impl PipelineData {
|
|||
PipelineData::Empty
|
||||
}
|
||||
|
||||
pub fn metadata(&self) -> Option<PipelineMetadata> {
|
||||
pub fn metadata(&self) -> Option<Box<PipelineMetadata>> {
|
||||
match self {
|
||||
PipelineData::ListStream(_, x) => x.clone(),
|
||||
PipelineData::ExternalStream { metadata: x, .. } => x.clone(),
|
||||
|
@ -82,7 +85,7 @@ impl PipelineData {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
|
||||
pub fn set_metadata(mut self, metadata: Option<Box<PipelineMetadata>>) -> Self {
|
||||
match &mut self {
|
||||
PipelineData::ListStream(_, x) => *x = metadata,
|
||||
PipelineData::ExternalStream { metadata: x, .. } => *x = metadata,
|
||||
|
@ -284,7 +287,7 @@ impl PipelineData {
|
|||
pub fn collect_string_strict(
|
||||
self,
|
||||
span: Span,
|
||||
) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
|
||||
) -> Result<(String, Span, Option<Box<PipelineMetadata>>), ShellError> {
|
||||
match self {
|
||||
PipelineData::Empty => Ok((String::new(), span, None)),
|
||||
PipelineData::Value(Value::String { val, span }, metadata) => Ok((val, span, metadata)),
|
||||
|
@ -809,9 +812,10 @@ impl Iterator for PipelineIterator {
|
|||
|
||||
pub trait IntoPipelineData {
|
||||
fn into_pipeline_data(self) -> PipelineData;
|
||||
|
||||
fn into_pipeline_data_with_metadata(
|
||||
self,
|
||||
metadata: impl Into<Option<PipelineMetadata>>,
|
||||
metadata: impl Into<Option<Box<PipelineMetadata>>>,
|
||||
) -> PipelineData;
|
||||
}
|
||||
|
||||
|
@ -822,9 +826,10 @@ where
|
|||
fn into_pipeline_data(self) -> PipelineData {
|
||||
PipelineData::Value(self.into(), None)
|
||||
}
|
||||
|
||||
fn into_pipeline_data_with_metadata(
|
||||
self,
|
||||
metadata: impl Into<Option<PipelineMetadata>>,
|
||||
metadata: impl Into<Option<Box<PipelineMetadata>>>,
|
||||
) -> PipelineData {
|
||||
PipelineData::Value(self.into(), metadata.into())
|
||||
}
|
||||
|
@ -834,7 +839,7 @@ pub trait IntoInterruptiblePipelineData {
|
|||
fn into_pipeline_data(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData;
|
||||
fn into_pipeline_data_with_metadata(
|
||||
self,
|
||||
metadata: impl Into<Option<PipelineMetadata>>,
|
||||
metadata: impl Into<Option<Box<PipelineMetadata>>>,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) -> PipelineData;
|
||||
}
|
||||
|
@ -857,7 +862,7 @@ where
|
|||
|
||||
fn into_pipeline_data_with_metadata(
|
||||
self,
|
||||
metadata: impl Into<Option<PipelineMetadata>>,
|
||||
metadata: impl Into<Option<Box<PipelineMetadata>>>,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) -> PipelineData {
|
||||
PipelineData::ListStream(
|
||||
|
|
Loading…
Reference in a new issue