Improve handling of columns with null values (#14588)

Addresses some null handling issues in #6882

# Description

This changes the implementation of guessing a column type when a schema
is not specified.

New behavior:
1. Use the first non-Value::Nothing value type for the columns data type
2. If the value type changes (ignoring Value::Nothing) in subsequent
values, the datatype will be changed to DataType::Object("Value", None)
3. If a column type does not have a value type,
DataType::Object("Value", None) will be assumed.
This commit is contained in:
Jack Wright 2024-12-14 16:36:01 -08:00 committed by GitHub
parent 05ee7ea9c7
commit 219b44a04f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -223,57 +223,30 @@ pub fn insert_value(
inner: vec![], inner: vec![],
}) })
} }
} } else {
// Checking that the type for the value is the same let current_data_type = value_to_data_type(&value);
// for the previous value in the column
else if col_val.values.is_empty() {
if let Some(schema) = maybe_schema {
if let Some(field) = schema.schema.get_field(&key) {
col_val.column_type = Some(field.dtype().clone());
}
}
if col_val.column_type.is_none() { if col_val.column_type.is_none() {
col_val.column_type = Some(value_to_data_type(&value)); col_val.column_type = value_to_data_type(&value);
} else if let Some(current_data_type) = current_data_type {
if col_val.column_type.as_ref() != Some(&current_data_type) {
col_val.column_type = Some(DataType::Object("Value", None));
}
} }
col_val.values.push(value); col_val.values.push(value);
Ok(()) Ok(())
} else {
let prev_value = &col_val.values[col_val.values.len() - 1];
match (&prev_value, &value) {
(Value::Int { .. }, Value::Int { .. })
| (Value::Float { .. }, Value::Float { .. })
| (Value::String { .. }, Value::String { .. })
| (Value::Bool { .. }, Value::Bool { .. })
| (Value::Date { .. }, Value::Date { .. })
| (Value::Filesize { .. }, Value::Filesize { .. })
| (Value::Binary { .. }, Value::Binary { .. })
| (Value::Duration { .. }, Value::Duration { .. }) => col_val.values.push(value),
(_, Value::Nothing { .. }) => col_val.values.push(value),
(Value::List { .. }, _) => {
col_val.column_type = Some(value_to_data_type(&value));
col_val.values.push(value);
}
_ => {
col_val.column_type = Some(DataType::Object("Value", None));
col_val.values.push(value);
}
}
Ok(())
} }
} }
fn value_to_data_type(value: &Value) -> DataType { fn value_to_data_type(value: &Value) -> Option<DataType> {
match &value { match &value {
Value::Int { .. } => DataType::Int64, Value::Int { .. } => Some(DataType::Int64),
Value::Float { .. } => DataType::Float64, Value::Float { .. } => Some(DataType::Float64),
Value::String { .. } => DataType::String, Value::String { .. } => Some(DataType::String),
Value::Bool { .. } => DataType::Boolean, Value::Bool { .. } => Some(DataType::Boolean),
Value::Date { .. } => DataType::Date, Value::Date { .. } => Some(DataType::Date),
Value::Duration { .. } => DataType::Duration(TimeUnit::Nanoseconds), Value::Duration { .. } => Some(DataType::Duration(TimeUnit::Nanoseconds)),
Value::Filesize { .. } => DataType::Int64, Value::Filesize { .. } => Some(DataType::Int64),
Value::Binary { .. } => DataType::Binary, Value::Binary { .. } => Some(DataType::Binary),
Value::List { vals, .. } => { Value::List { vals, .. } => {
// We need to determined the type inside of the list. // We need to determined the type inside of the list.
// Since Value::List does not have any kind of // Since Value::List does not have any kind of
@ -286,243 +259,247 @@ fn value_to_data_type(value: &Value) -> DataType {
.filter(|v| !matches!(v, Value::Nothing { .. })) .filter(|v| !matches!(v, Value::Nothing { .. }))
.map(value_to_data_type) .map(value_to_data_type)
.nth(1) .nth(1)
.flatten()
.unwrap_or(DataType::Object("Value", None)); .unwrap_or(DataType::Object("Value", None));
DataType::List(Box::new(list_type)) Some(DataType::List(Box::new(list_type)))
} }
_ => DataType::Object("Value", None), _ => None,
} }
} }
fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result<Series, ShellError> { fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result<Series, ShellError> {
if let Some(column_type) = &column.column_type { let column_type = &column
match column_type { .column_type
DataType::Float32 => { .clone()
let series_values: Result<Vec<_>, _> = column .unwrap_or(DataType::Object("Value", None));
.values match column_type {
.iter() DataType::Float32 => {
.map(|v| { let series_values: Result<Vec<_>, _> = column
value_to_option(v, |v| match v { .values
Value::Float { val, .. } => Ok(*val as f32), .iter()
Value::Int { val, .. } => Ok(*val as f32), .map(|v| {
x => Err(ShellError::GenericError { value_to_option(v, |v| match v {
error: "Error converting to f32".into(), Value::Float { val, .. } => Ok(*val as f32),
msg: "".into(), Value::Int { val, .. } => Ok(*val as f32),
span: None, x => Err(ShellError::GenericError {
help: Some(format!("Unexpected type: {x:?}")), error: "Error converting to f32".into(),
inner: vec![], msg: "".into(),
}), span: None,
}) help: Some(format!("Unexpected type: {x:?}")),
inner: vec![],
}),
}) })
.collect(); })
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::Float64 => { }
let series_values: Result<Vec<_>, _> = column DataType::Float64 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| { .iter()
value_to_option(v, |v| match v { .map(|v| {
Value::Float { val, .. } => Ok(*val), value_to_option(v, |v| match v {
Value::Int { val, .. } => Ok(*val as f64), Value::Float { val, .. } => Ok(*val),
x => Err(ShellError::GenericError { Value::Int { val, .. } => Ok(*val as f64),
error: "Error converting to f64".into(), x => Err(ShellError::GenericError {
msg: "".into(), error: "Error converting to f64".into(),
span: None, msg: "".into(),
help: Some(format!("Unexpected type: {x:?}")), span: None,
inner: vec![], help: Some(format!("Unexpected type: {x:?}")),
}), inner: vec![],
}) }),
}) })
.collect(); })
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::UInt8 => { }
let series_values: Result<Vec<_>, _> = column DataType::UInt8 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u8))) .iter()
.collect(); .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u8)))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::UInt16 => { }
let series_values: Result<Vec<_>, _> = column DataType::UInt16 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u16))) .iter()
.collect(); .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u16)))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::UInt32 => { }
let series_values: Result<Vec<_>, _> = column DataType::UInt32 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u32))) .iter()
.collect(); .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u32)))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::UInt64 => { }
let series_values: Result<Vec<_>, _> = column DataType::UInt64 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u64))) .iter()
.collect(); .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as u64)))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::Int8 => { }
let series_values: Result<Vec<_>, _> = column DataType::Int8 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i8))) .iter()
.collect(); .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i8)))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::Int16 => { }
let series_values: Result<Vec<_>, _> = column DataType::Int16 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i16))) .iter()
.collect(); .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i16)))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::Int32 => { }
let series_values: Result<Vec<_>, _> = column DataType::Int32 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i32))) .iter()
.collect(); .map(|v| value_to_option(v, |v| value_to_int(v).map(|v| v as i32)))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::Int64 => { }
let series_values: Result<Vec<_>, _> = column DataType::Int64 => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, value_to_int)) .iter()
.collect(); .map(|v| value_to_option(v, value_to_int))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::Boolean => { }
let series_values: Result<Vec<_>, _> = column DataType::Boolean => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| v.as_bool())) .iter()
.collect(); .map(|v| value_to_option(v, |v| v.as_bool()))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::String => { }
let series_values: Result<Vec<_>, _> = column DataType::String => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| value_to_option(v, |v| v.coerce_string())) .iter()
.collect(); .map(|v| value_to_option(v, |v| v.coerce_string()))
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::Binary | DataType::BinaryOffset => { }
let series_values: Result<Vec<_>, _> = DataType::Binary | DataType::BinaryOffset => {
column.values.iter().map(|v| v.coerce_binary()).collect(); let series_values: Result<Vec<_>, _> =
Ok(Series::new(name, series_values?)) column.values.iter().map(|v| v.coerce_binary()).collect();
} Ok(Series::new(name, series_values?))
DataType::Object(_, _) => value_to_series(name, &column.values), }
DataType::Duration(time_unit) => { DataType::Object(_, _) => value_to_series(name, &column.values),
let series_values: Result<Vec<_>, _> = column DataType::Duration(time_unit) => {
.values let series_values: Result<Vec<_>, _> = column
.iter() .values
.map(|v| { .iter()
value_to_option(v, |v| { .map(|v| {
v.as_duration().map(|v| nanos_from_timeunit(v, *time_unit)) value_to_option(v, |v| {
}) v.as_duration().map(|v| nanos_from_timeunit(v, *time_unit))
}) })
.collect(); })
Ok(Series::new(name, series_values?)) .collect();
} Ok(Series::new(name, series_values?))
DataType::List(list_type) => { }
match input_type_list_to_series(&name, list_type.as_ref(), &column.values) { DataType::List(list_type) => {
Ok(series) => Ok(series), match input_type_list_to_series(&name, list_type.as_ref(), &column.values) {
Err(_) => { Ok(series) => Ok(series),
// An error case will occur when there are lists of mixed types. Err(_) => {
// If this happens, fallback to object list // An error case will occur when there are lists of mixed types.
input_type_list_to_series( // If this happens, fallback to object list
&name, input_type_list_to_series(
&DataType::Object("unknown", None), &name,
&column.values, &DataType::Object("unknown", None),
) &column.values,
} )
} }
} }
DataType::Date => { }
let it = column.values.iter().map(|v| { DataType::Date => {
let it = column.values.iter().map(|v| {
if let Value::Date { val, .. } = &v {
Some(val.timestamp_nanos_opt().unwrap_or_default())
} else {
None
}
});
let res: DatetimeChunked = ChunkedArray::<Int64Type>::from_iter_options(name, it)
.into_datetime(TimeUnit::Nanoseconds, None);
Ok(res.into_series())
}
DataType::Datetime(tu, maybe_tz) => {
let dates = column
.values
.iter()
.map(|v| {
if let Value::Date { val, .. } = &v { if let Value::Date { val, .. } = &v {
Some(val.timestamp_nanos_opt().unwrap_or_default()) // If there is a timezone specified, make sure
} else { // the value is converted to it
None Ok(maybe_tz
} .as_ref()
}); .map(|tz| tz.parse::<Tz>().map(|tz| val.with_timezone(&tz)))
.transpose()
let res: DatetimeChunked = ChunkedArray::<Int64Type>::from_iter_options(name, it)
.into_datetime(TimeUnit::Nanoseconds, None);
Ok(res.into_series())
}
DataType::Datetime(tu, maybe_tz) => {
let dates = column
.values
.iter()
.map(|v| {
if let Value::Date { val, .. } = &v {
// If there is a timezone specified, make sure
// the value is converted to it
Ok(maybe_tz
.as_ref()
.map(|tz| tz.parse::<Tz>().map(|tz| val.with_timezone(&tz)))
.transpose()
.map_err(|e| ShellError::GenericError {
error: "Error parsing timezone".into(),
msg: "".into(),
span: None,
help: Some(e.to_string()),
inner: vec![],
})?
.and_then(|dt| dt.timestamp_nanos_opt())
.map(|nanos| nanos_from_timeunit(nanos, *tu)))
} else {
Ok(None)
}
})
.collect::<Result<Vec<Option<i64>>, ShellError>>()?;
let res: DatetimeChunked =
ChunkedArray::<Int64Type>::from_iter_options(name, dates.into_iter())
.into_datetime(*tu, maybe_tz.clone());
Ok(res.into_series())
}
DataType::Struct(fields) => {
let schema = Some(NuSchema::new(Schema::from_iter(fields.clone())));
// let mut structs: Vec<Series> = Vec::new();
let mut structs: HashMap<PlSmallStr, Series> = HashMap::new();
for v in column.values.iter() {
let mut column_values: ColumnMap = IndexMap::new();
let record = v.as_record()?;
insert_record(&mut column_values, record.clone(), &schema)?;
let df = from_parsed_columns(column_values)?;
for name in df.df.get_column_names() {
let series = df
.df
.column(name)
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
error: format!( error: "Error parsing timezone".into(),
"Error creating struct, could not get column name {name}: {e}"
),
msg: "".into(), msg: "".into(),
span: None, span: None,
help: None, help: Some(e.to_string()),
inner: vec![], inner: vec![],
})? })?
.as_materialized_series(); .and_then(|dt| dt.timestamp_nanos_opt())
.map(|nanos| nanos_from_timeunit(nanos, *tu)))
} else {
Ok(None)
}
})
.collect::<Result<Vec<Option<i64>>, ShellError>>()?;
if let Some(v) = structs.get_mut(name) { let res: DatetimeChunked =
let _ = v.append(series) ChunkedArray::<Int64Type>::from_iter_options(name, dates.into_iter())
.into_datetime(*tu, maybe_tz.clone());
Ok(res.into_series())
}
DataType::Struct(fields) => {
let schema = Some(NuSchema::new(Schema::from_iter(fields.clone())));
// let mut structs: Vec<Series> = Vec::new();
let mut structs: HashMap<PlSmallStr, Series> = HashMap::new();
for v in column.values.iter() {
let mut column_values: ColumnMap = IndexMap::new();
let record = v.as_record()?;
insert_record(&mut column_values, record.clone(), &schema)?;
let df = from_parsed_columns(column_values)?;
for name in df.df.get_column_names() {
let series = df
.df
.column(name)
.map_err(|e| ShellError::GenericError {
error: format!(
"Error creating struct, could not get column name {name}: {e}"
),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?
.as_materialized_series();
if let Some(v) = structs.get_mut(name) {
let _ = v.append(series)
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
error: format!("Error creating struct, could not append to series for col {name}: {e}"), error: format!("Error creating struct, could not append to series for col {name}: {e}"),
msg: "".into(), msg: "".into(),
@ -530,44 +507,32 @@ fn typed_column_to_series(name: PlSmallStr, column: TypedColumn) -> Result<Serie
help: None, help: None,
inner: vec![], inner: vec![],
})?; })?;
} else { } else {
structs.insert(name.clone(), series.to_owned()); structs.insert(name.clone(), series.to_owned());
}
} }
} }
let structs: Vec<Series> = structs.into_values().collect();
let chunked = StructChunked::from_series(
column.name().to_owned(),
structs.len(),
structs.iter(),
)
.map_err(|e| ShellError::GenericError {
error: format!("Error creating struct: {e}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
Ok(chunked.into_series())
} }
_ => Err(ShellError::GenericError {
error: format!("Error creating dataframe: Unsupported type: {column_type:?}"), let structs: Vec<Series> = structs.into_values().collect();
msg: "".into(),
span: None, let chunked =
help: None, StructChunked::from_series(column.name().to_owned(), structs.len(), structs.iter())
inner: vec![], .map_err(|e| ShellError::GenericError {
}), error: format!("Error creating struct: {e}"),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})?;
Ok(chunked.into_series())
} }
} else { _ => Err(ShellError::GenericError {
Err(ShellError::GenericError { error: format!("Error creating dataframe: Unsupported type: {column_type:?}"),
error: "Passed a type column with no type".into(),
msg: "".into(), msg: "".into(),
span: None, span: None,
help: None, help: None,
inner: vec![], inner: vec![],
}) }),
} }
} }