From 219b44a04f0d22adfd0c733158a8228571723f5a Mon Sep 17 00:00:00 2001 From: Jack Wright <56345+ayax79@users.noreply.github.com> Date: Sat, 14 Dec 2024 16:36:01 -0800 Subject: [PATCH] Improve handling of columns with null values (#14588) Addresses some null handling issues in #6882 # Description This changes the implementation of guessing a column type when a schema is not specified. New behavior: 1. Use the first non-Value::Nothing value type for the columns data type 2. If the value type changes (ignoring Value::Nothing) in subsequent values, the datatype will be changed to DataType::Object("Value", None) 3. If a column type does not have a value type, DataType::Object("Value", None) will be assumed. --- .../values/nu_dataframe/conversion.rs | 547 ++++++++---------- 1 file changed, 256 insertions(+), 291 deletions(-) diff --git a/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/conversion.rs b/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/conversion.rs index 7d5a4ace3f..d196ed8ba6 100644 --- a/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/conversion.rs +++ b/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/conversion.rs @@ -223,57 +223,30 @@ pub fn insert_value( inner: vec![], }) } - } - // Checking that the type for the value is the same - // for the previous value in the column - else if col_val.values.is_empty() { - if let Some(schema) = maybe_schema { - if let Some(field) = schema.schema.get_field(&key) { - col_val.column_type = Some(field.dtype().clone()); - } - } - + } else { + let current_data_type = value_to_data_type(&value); if col_val.column_type.is_none() { - col_val.column_type = Some(value_to_data_type(&value)); + col_val.column_type = value_to_data_type(&value); + } else if let Some(current_data_type) = current_data_type { + if col_val.column_type.as_ref() != Some(¤t_data_type) { + col_val.column_type = Some(DataType::Object("Value", None)); + } } col_val.values.push(value); Ok(()) - } else { - let prev_value = &col_val.values[col_val.values.len() - 1]; - - match (&prev_value, &value) { - (Value::Int { .. }, Value::Int { .. }) - | (Value::Float { .. }, Value::Float { .. }) - | (Value::String { .. }, Value::String { .. }) - | (Value::Bool { .. }, Value::Bool { .. }) - | (Value::Date { .. }, Value::Date { .. }) - | (Value::Filesize { .. }, Value::Filesize { .. }) - | (Value::Binary { .. }, Value::Binary { .. }) - | (Value::Duration { .. }, Value::Duration { .. }) => col_val.values.push(value), - (_, Value::Nothing { .. }) => col_val.values.push(value), - (Value::List { .. }, _) => { - col_val.column_type = Some(value_to_data_type(&value)); - col_val.values.push(value); - } - _ => { - col_val.column_type = Some(DataType::Object("Value", None)); - col_val.values.push(value); - } - } - Ok(()) } } -fn value_to_data_type(value: &Value) -> DataType { +fn value_to_data_type(value: &Value) -> Option { match &value { - Value::Int { .. } => DataType::Int64, - Value::Float { .. } => DataType::Float64, - Value::String { .. } => DataType::String, - Value::Bool { .. } => DataType::Boolean, - Value::Date { .. } => DataType::Date, - Value::Duration { .. } => DataType::Duration(TimeUnit::Nanoseconds), - Value::Filesize { .. } => DataType::Int64, - Value::Binary { .. } => DataType::Binary, + Value::Int { .. } => Some(DataType::Int64), + Value::Float { .. } => Some(DataType::Float64), + Value::String { .. } => Some(DataType::String), + Value::Bool { .. } => Some(DataType::Boolean), + Value::Date { .. } => Some(DataType::Date), + Value::Duration { .. } => Some(DataType::Duration(TimeUnit::Nanoseconds)), + Value::Filesize { .. } => Some(DataType::Int64), + Value::Binary { .. } => Some(DataType::Binary), Value::List { vals, .. } => { // We need to determined the type inside of the list. // Since Value::List does not have any kind of @@ -286,243 +259,247 @@ fn value_to_data_type(value: &Value) -> DataType { .filter(|v| !matches!(v, Value::Nothing { .. })) .map(value_to_data_type) .nth(1) + .flatten() .unwrap_or(DataType::Object("Value", None)); - DataType::List(Box::new(list_type)) + Some(DataType::List(Box::new(list_type))) } - _ => DataType::Object("Value", None), + _ => None, } } fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result { - if let Some(column_type) = &column.column_type { - match column_type { - DataType::Float32 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| { - value_to_option(v, |v| match v { - Value::Float { val, .. } => Ok(*val as f32), - Value::Int { val, .. } => Ok(*val as f32), - x => Err(ShellError::GenericError { - error: "Error converting to f32".into(), - msg: "".into(), - span: None, - help: Some(format!("Unexpected type: {x:?}")), - inner: vec![], - }), - }) + let column_type = &column + .column_type + .clone() + .unwrap_or(DataType::Object("Value", None)); + match column_type { + DataType::Float32 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| { + value_to_option(v, |v| match v { + Value::Float { val, .. } => Ok(*val as f32), + Value::Int { val, .. } => Ok(*val as f32), + x => Err(ShellError::GenericError { + error: "Error converting to f32".into(), + msg: "".into(), + span: None, + help: Some(format!("Unexpected type: {x:?}")), + inner: vec![], + }), }) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Float64 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| { - value_to_option(v, |v| match v { - Value::Float { val, .. } => Ok(*val), - Value::Int { val, .. } => Ok(*val as f64), - x => Err(ShellError::GenericError { - error: "Error converting to f64".into(), - msg: "".into(), - span: None, - help: Some(format!("Unexpected type: {x:?}")), - inner: vec![], - }), - }) + }) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Float64 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| { + value_to_option(v, |v| match v { + Value::Float { val, .. } => Ok(*val), + Value::Int { val, .. } => Ok(*val as f64), + x => Err(ShellError::GenericError { + error: "Error converting to f64".into(), + msg: "".into(), + span: None, + help: Some(format!("Unexpected type: {x:?}")), + inner: vec![], + }), }) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::UInt8 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u8))) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::UInt16 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u16))) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::UInt32 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u32))) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::UInt64 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u64))) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Int8 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i8))) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Int16 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i16))) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Int32 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i32))) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Int64 => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, value_to_int)) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Boolean => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| v.as_bool())) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::String => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| value_to_option(v, |v| v.coerce_string())) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Binary | DataType::BinaryOffset => { - let series_values: Result, _> = - column.values.iter().map(|v| v.coerce_binary()).collect(); - Ok(Series::new(name, series_values?)) - } - DataType::Object(_, _) => value_to_series(name, &column.values), - DataType::Duration(time_unit) => { - let series_values: Result, _> = column - .values - .iter() - .map(|v| { - value_to_option(v, |v| { - v.as_duration().map(|v| nanos_from_timeunit(v, *time_unit)) - }) + }) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::UInt8 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u8))) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::UInt16 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u16))) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::UInt32 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u32))) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::UInt64 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u64))) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Int8 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i8))) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Int16 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i16))) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Int32 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i32))) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Int64 => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, value_to_int)) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Boolean => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| v.as_bool())) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::String => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| value_to_option(v, |v| v.coerce_string())) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Binary | DataType::BinaryOffset => { + let series_values: Result, _> = + column.values.iter().map(|v| v.coerce_binary()).collect(); + Ok(Series::new(name, series_values?)) + } + DataType::Object(_, _) => value_to_series(name, &column.values), + DataType::Duration(time_unit) => { + let series_values: Result, _> = column + .values + .iter() + .map(|v| { + value_to_option(v, |v| { + v.as_duration().map(|v| nanos_from_timeunit(v, *time_unit)) }) - .collect(); - Ok(Series::new(name, series_values?)) - } - DataType::List(list_type) => { - match input_type_list_to_series(&name, list_type.as_ref(), &column.values) { - Ok(series) => Ok(series), - Err(_) => { - // An error case will occur when there are lists of mixed types. - // If this happens, fallback to object list - input_type_list_to_series( - &name, - &DataType::Object("unknown", None), - &column.values, - ) - } + }) + .collect(); + Ok(Series::new(name, series_values?)) + } + DataType::List(list_type) => { + match input_type_list_to_series(&name, list_type.as_ref(), &column.values) { + Ok(series) => Ok(series), + Err(_) => { + // An error case will occur when there are lists of mixed types. + // If this happens, fallback to object list + input_type_list_to_series( + &name, + &DataType::Object("unknown", None), + &column.values, + ) } } - DataType::Date => { - let it = column.values.iter().map(|v| { + } + DataType::Date => { + let it = column.values.iter().map(|v| { + if let Value::Date { val, .. } = &v { + Some(val.timestamp_nanos_opt().unwrap_or_default()) + } else { + None + } + }); + + let res: DatetimeChunked = ChunkedArray::::from_iter_options(name, it) + .into_datetime(TimeUnit::Nanoseconds, None); + + Ok(res.into_series()) + } + DataType::Datetime(tu, maybe_tz) => { + let dates = column + .values + .iter() + .map(|v| { if let Value::Date { val, .. } = &v { - Some(val.timestamp_nanos_opt().unwrap_or_default()) - } else { - None - } - }); - - let res: DatetimeChunked = ChunkedArray::::from_iter_options(name, it) - .into_datetime(TimeUnit::Nanoseconds, None); - - Ok(res.into_series()) - } - DataType::Datetime(tu, maybe_tz) => { - let dates = column - .values - .iter() - .map(|v| { - if let Value::Date { val, .. } = &v { - // If there is a timezone specified, make sure - // the value is converted to it - Ok(maybe_tz - .as_ref() - .map(|tz| tz.parse::().map(|tz| val.with_timezone(&tz))) - .transpose() - .map_err(|e| ShellError::GenericError { - error: "Error parsing timezone".into(), - msg: "".into(), - span: None, - help: Some(e.to_string()), - inner: vec![], - })? - .and_then(|dt| dt.timestamp_nanos_opt()) - .map(|nanos| nanos_from_timeunit(nanos, *tu))) - } else { - Ok(None) - } - }) - .collect::>, ShellError>>()?; - - let res: DatetimeChunked = - ChunkedArray::::from_iter_options(name, dates.into_iter()) - .into_datetime(*tu, maybe_tz.clone()); - - Ok(res.into_series()) - } - DataType::Struct(fields) => { - let schema = Some(NuSchema::new(Schema::from_iter(fields.clone()))); - // let mut structs: Vec = Vec::new(); - let mut structs: HashMap = HashMap::new(); - - for v in column.values.iter() { - let mut column_values: ColumnMap = IndexMap::new(); - let record = v.as_record()?; - insert_record(&mut column_values, record.clone(), &schema)?; - let df = from_parsed_columns(column_values)?; - for name in df.df.get_column_names() { - let series = df - .df - .column(name) + // If there is a timezone specified, make sure + // the value is converted to it + Ok(maybe_tz + .as_ref() + .map(|tz| tz.parse::().map(|tz| val.with_timezone(&tz))) + .transpose() .map_err(|e| ShellError::GenericError { - error: format!( - "Error creating struct, could not get column name {name}: {e}" - ), + error: "Error parsing timezone".into(), msg: "".into(), span: None, - help: None, + help: Some(e.to_string()), inner: vec![], })? - .as_materialized_series(); + .and_then(|dt| dt.timestamp_nanos_opt()) + .map(|nanos| nanos_from_timeunit(nanos, *tu))) + } else { + Ok(None) + } + }) + .collect::>, ShellError>>()?; - if let Some(v) = structs.get_mut(name) { - let _ = v.append(series) + let res: DatetimeChunked = + ChunkedArray::::from_iter_options(name, dates.into_iter()) + .into_datetime(*tu, maybe_tz.clone()); + + Ok(res.into_series()) + } + DataType::Struct(fields) => { + let schema = Some(NuSchema::new(Schema::from_iter(fields.clone()))); + // let mut structs: Vec = Vec::new(); + let mut structs: HashMap = HashMap::new(); + + for v in column.values.iter() { + let mut column_values: ColumnMap = IndexMap::new(); + let record = v.as_record()?; + insert_record(&mut column_values, record.clone(), &schema)?; + let df = from_parsed_columns(column_values)?; + for name in df.df.get_column_names() { + let series = df + .df + .column(name) + .map_err(|e| ShellError::GenericError { + error: format!( + "Error creating struct, could not get column name {name}: {e}" + ), + msg: "".into(), + span: None, + help: None, + inner: vec![], + })? + .as_materialized_series(); + + if let Some(v) = structs.get_mut(name) { + let _ = v.append(series) .map_err(|e| ShellError::GenericError { error: format!("Error creating struct, could not append to series for col {name}: {e}"), msg: "".into(), @@ -530,44 +507,32 @@ fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result = structs.into_values().collect(); - - let chunked = StructChunked::from_series( - column.name().to_owned(), - structs.len(), - structs.iter(), - ) - .map_err(|e| ShellError::GenericError { - error: format!("Error creating struct: {e}"), - msg: "".into(), - span: None, - help: None, - inner: vec![], - })?; - Ok(chunked.into_series()) } - _ => Err(ShellError::GenericError { - error: format!("Error creating dataframe: Unsupported type: {column_type:?}"), - msg: "".into(), - span: None, - help: None, - inner: vec![], - }), + + let structs: Vec = structs.into_values().collect(); + + let chunked = + StructChunked::from_series(column.name().to_owned(), structs.len(), structs.iter()) + .map_err(|e| ShellError::GenericError { + error: format!("Error creating struct: {e}"), + msg: "".into(), + span: None, + help: None, + inner: vec![], + })?; + Ok(chunked.into_series()) } - } else { - Err(ShellError::GenericError { - error: "Passed a type column with no type".into(), + _ => Err(ShellError::GenericError { + error: format!("Error creating dataframe: Unsupported type: {column_type:?}"), msg: "".into(), span: None, help: None, inner: vec![], - }) + }), } }