join and from derived tables (#5477)

This commit is contained in:
Fernando Herrera 2022-05-08 11:12:03 +01:00 committed by GitHub
parent 374757f286
commit 061e9294b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 372 additions and 69 deletions

View file

@ -29,26 +29,15 @@ impl Command for AliasExpr {
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
vec![Example {
description: "Creates an alias for a column selection",
example: "db col name_a | db as new_a",
result: None,
},
Example {
description: "Creates an alias for a table",
example: r#"db open name
| db select a
| db from table_a
| db as table_a_new
| db describe"#,
result: None,
},
]
}]
}
fn search_terms(&self) -> Vec<&str> {
vec!["database", "column", "expression"]
vec!["database", "alias", "column"]
}
fn run(

View file

@ -0,0 +1,66 @@
use crate::{database::values::definitions::ConnectionDb, SQLiteDatabase};
use nu_protocol::{ShellError, Value};
use sqlparser::ast::{ObjectName, Statement, TableAlias, TableFactor};
pub fn value_into_table_factor(
table: Value,
connection: &ConnectionDb,
alias: Option<TableAlias>,
) -> Result<TableFactor, ShellError> {
match table {
Value::String { val, .. } => {
let ident = sqlparser::ast::Ident {
value: val,
quote_style: None,
};
Ok(TableFactor::Table {
name: ObjectName(vec![ident]),
alias,
args: Vec::new(),
with_hints: Vec::new(),
})
}
Value::CustomValue { span, .. } => {
let db = SQLiteDatabase::try_from_value(table)?;
if &db.connection != connection {
return Err(ShellError::GenericError(
"Incompatible connections".into(),
"trying to join on table with different connection".into(),
Some(span),
None,
Vec::new(),
));
}
match db.statement {
Some(statement) => match statement {
Statement::Query(query) => Ok(TableFactor::Derived {
lateral: false,
subquery: query,
alias,
}),
s => Err(ShellError::GenericError(
"Connection doesnt define a query".into(),
format!("Expected a connection with query. Got {}", s),
Some(span),
None,
Vec::new(),
)),
},
None => Err(ShellError::GenericError(
"Error creating derived table".into(),
"there is no statement defined yet".into(),
Some(span),
None,
Vec::new(),
)),
}
}
_ => Err(ShellError::UnsupportedInput(
"String or connection".into(),
table.span()?,
)),
}
}

View file

@ -1,13 +1,13 @@
use super::super::SQLiteDatabase;
use crate::database::values::definitions::ConnectionDb;
use super::{super::SQLiteDatabase, conversions::value_into_table_factor};
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span, SyntaxShape,
};
use sqlparser::ast::{
Ident, ObjectName, Query, Select, SetExpr, Statement, TableFactor, TableWithJoins,
Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, SyntaxShape, Value,
};
use sqlparser::ast::{Ident, Query, Select, SetExpr, Statement, TableAlias, TableWithJoins};
#[derive(Clone)]
pub struct FromDb;
@ -25,8 +25,14 @@ impl Command for FromDb {
Signature::build(self.name())
.required(
"select",
SyntaxShape::Any,
"table of derived table to select from",
)
.named(
"as",
SyntaxShape::String,
"Name of table to select from",
"Alias for the selected table",
Some('a'),
)
.category(Category::Custom("database".into()))
}
@ -50,22 +56,36 @@ impl Command for FromDb {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let table: String = call.req(engine_state, stack, 0)?;
let mut db = SQLiteDatabase::try_from_pipeline(input, call.head)?;
db.statement = match db.statement {
None => Some(create_statement(table)),
Some(statement) => Some(modify_statement(statement, table, call.head)?),
None => Some(create_statement(&db.connection, engine_state, stack, call)?),
Some(statement) => Some(modify_statement(
&db.connection,
statement,
engine_state,
stack,
call,
)?),
};
Ok(db.into_value(call.head).into_pipeline_data())
}
}
fn create_statement(table: String) -> Statement {
fn create_statement(
connection: &ConnectionDb,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<Statement, ShellError> {
let query = Query {
with: None,
body: SetExpr::Select(Box::new(create_select(table))),
body: SetExpr::Select(Box::new(create_select(
connection,
engine_state,
stack,
call,
)?)),
order_by: Vec::new(),
limit: None,
offset: None,
@ -73,42 +93,57 @@ fn create_statement(table: String) -> Statement {
lock: None,
};
Statement::Query(Box::new(query))
Ok(Statement::Query(Box::new(query)))
}
fn modify_statement(
connection: &ConnectionDb,
mut statement: Statement,
table: String,
span: Span,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<Statement, ShellError> {
match statement {
Statement::Query(ref mut query) => {
match query.body {
SetExpr::Select(ref mut select) => select.as_mut().from = create_from(table),
SetExpr::Select(ref mut select) => {
let table = create_table(connection, engine_state, stack, call)?;
select.from.push(table);
}
_ => {
query.as_mut().body = SetExpr::Select(Box::new(create_select(table)));
query.as_mut().body = SetExpr::Select(Box::new(create_select(
connection,
engine_state,
stack,
call,
)?));
}
};
Ok(statement)
}
s => Err(ShellError::GenericError(
"Connection doesnt define a statement".into(),
"Connection doesnt define a query".into(),
format!("Expected a connection with query. Got {}", s),
Some(span),
Some(call.head),
None,
Vec::new(),
)),
}
}
fn create_select(table: String) -> Select {
Select {
fn create_select(
connection: &ConnectionDb,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<Select, ShellError> {
Ok(Select {
distinct: false,
top: None,
projection: Vec::new(),
into: None,
from: create_from(table),
from: vec![create_table(connection, engine_state, stack, call)?],
lateral_views: Vec::new(),
selection: None,
group_by: Vec::new(),
@ -116,29 +151,32 @@ fn create_select(table: String) -> Select {
distribute_by: Vec::new(),
sort_by: Vec::new(),
having: None,
}
})
}
// This function needs more work
// It needs to define multi tables and joins
// I assume we will need to define expressions for the columns instead of strings
fn create_from(table: String) -> Vec<TableWithJoins> {
let ident = Ident {
value: table,
fn create_table(
connection: &ConnectionDb,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<TableWithJoins, ShellError> {
let alias = call
.get_flag::<String>(engine_state, stack, "as")?
.map(|alias| TableAlias {
name: Ident {
value: alias,
quote_style: None,
};
},
columns: Vec::new(),
});
let table_factor = TableFactor::Table {
name: ObjectName(vec![ident]),
alias: None,
args: Vec::new(),
with_hints: Vec::new(),
};
let select_table: Value = call.req(engine_state, stack, 0)?;
let table_factor = value_into_table_factor(select_table, connection, alias)?;
let table = TableWithJoins {
relation: table_factor,
joins: Vec::new(),
};
vec![table]
Ok(table)
}

View file

@ -0,0 +1,178 @@
use super::{super::SQLiteDatabase, conversions::value_into_table_factor};
use crate::database::values::{definitions::ConnectionDb, dsl::ExprDb};
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, SyntaxShape, Value,
};
use sqlparser::ast::{
Ident, Join, JoinConstraint, JoinOperator, Select, SetExpr, Statement, TableAlias,
};
#[derive(Clone)]
pub struct JoinDb;
impl Command for JoinDb {
fn name(&self) -> &str {
"db join"
}
fn usage(&self) -> &str {
"Joins with another table or derived table. Default join type is inner"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required(
"table",
SyntaxShape::Any,
"table or derived table to join on",
)
.required("on", SyntaxShape::Any, "expression to join tables")
.named(
"as",
SyntaxShape::String,
"Alias for the selected join",
Some('a'),
)
.switch("left", "left outer join", Some('l'))
.switch("right", "right outer join", Some('r'))
.switch("outer", "full outer join", Some('o'))
.switch("cross", "cross join", Some('c'))
.category(Category::Custom("database".into()))
}
fn search_terms(&self) -> Vec<&str> {
vec!["database", "join"]
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "",
example: "",
result: None,
}]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let mut db = SQLiteDatabase::try_from_pipeline(input, call.head)?;
db.statement = match db.statement {
Some(statement) => Some(modify_statement(
&db.connection,
statement,
engine_state,
stack,
call,
)?),
None => {
return Err(ShellError::GenericError(
"Error creating join".into(),
"there is no statement defined yet".into(),
Some(call.head),
None,
Vec::new(),
))
}
};
Ok(db.into_value(call.head).into_pipeline_data())
}
}
fn modify_statement(
connection: &ConnectionDb,
mut statement: Statement,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<Statement, ShellError> {
match statement {
Statement::Query(ref mut query) => {
match &mut query.body {
SetExpr::Select(ref mut select) => {
modify_from(connection, select, engine_state, stack, call)?
}
s => {
return Err(ShellError::GenericError(
"Connection doesnt define a select".into(),
format!("Expected a connection with select. Got {}", s),
Some(call.head),
None,
Vec::new(),
))
}
};
Ok(statement)
}
s => Err(ShellError::GenericError(
"Connection doesnt define a query".into(),
format!("Expected a connection with query. Got {}", s),
Some(call.head),
None,
Vec::new(),
)),
}
}
fn modify_from(
connection: &ConnectionDb,
select: &mut Select,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<(), ShellError> {
match select.from.last_mut() {
Some(table) => {
let alias = call
.get_flag::<String>(engine_state, stack, "as")?
.map(|alias| TableAlias {
name: Ident {
value: alias,
quote_style: None,
},
columns: Vec::new(),
});
let join_table: Value = call.req(engine_state, stack, 0)?;
let table_factor = value_into_table_factor(join_table, connection, alias)?;
let on_expr: Value = call.req(engine_state, stack, 1)?;
let on_expr = ExprDb::try_from_value(&on_expr)?;
let join_on = if call.has_flag("left") {
JoinOperator::LeftOuter(JoinConstraint::On(on_expr.into_native()))
} else if call.has_flag("right") {
JoinOperator::RightOuter(JoinConstraint::On(on_expr.into_native()))
} else if call.has_flag("outer") {
JoinOperator::FullOuter(JoinConstraint::On(on_expr.into_native()))
} else {
JoinOperator::Inner(JoinConstraint::On(on_expr.into_native()))
};
let join = Join {
relation: table_factor,
join_operator: join_on,
};
table.joins.push(join);
Ok(())
}
None => Err(ShellError::GenericError(
"Connection without table defined".into(),
"Expected a table defined".into(),
Some(call.head),
None,
Vec::new(),
)),
}
}

View file

@ -1,3 +1,6 @@
// Conversions between value and sqlparser objects
pub mod conversions;
mod alias;
mod and;
mod col;
@ -7,6 +10,7 @@ mod describe;
mod from;
mod function;
mod group_by;
mod join;
mod limit;
mod open;
mod or;
@ -32,6 +36,7 @@ use describe::DescribeDb;
use from::FromDb;
use function::FunctionExpr;
use group_by::GroupByDb;
use join::JoinDb;
use limit::LimitDb;
use open::OpenDb;
use or::OrDb;
@ -56,20 +61,21 @@ pub fn add_database_decls(working_set: &mut StateWorkingSet) {
bind_command!(
AliasExpr,
AndDb,
CollectDb,
ColExpr,
CollectDb,
Database,
DescribeDb,
FromDb,
FunctionExpr,
GroupByDb,
QueryDb,
JoinDb,
LimitDb,
ProjectionDb,
OpenDb,
OrderByDb,
OrDb,
OverExpr,
QueryDb,
ProjectionDb,
SchemaDb,
TestingDb,
WhereDb

View file

@ -68,7 +68,7 @@ impl Command for SchemaDb {
cols.push("db_filename".into());
vals.push(Value::String {
val: sqlite_db.path.to_string_lossy().to_string(),
val: sqlite_db.connection.to_string(),
span,
});

View file

@ -1,3 +1,7 @@
use nu_protocol::{ShellError, Span};
use serde::{Deserialize, Serialize};
use std::{fmt::Display, path::PathBuf};
pub mod db;
pub mod db_column;
pub mod db_constraint;
@ -6,3 +10,24 @@ pub mod db_index;
pub mod db_row;
pub mod db_schema;
pub mod db_table;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum ConnectionDb {
Path(PathBuf),
}
impl Display for ConnectionDb {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Path(path) => write!(f, "{}", path.to_str().unwrap_or("")),
}
}
}
impl ConnectionDb {
pub fn as_path(&self, _span: Span) -> Result<&PathBuf, ShellError> {
match self {
Self::Path(path) => Ok(path),
}
}
}

View file

@ -1,3 +1,4 @@
use super::definitions::ConnectionDb;
use crate::database::values::definitions::{
db::Db, db_column::DbColumn, db_constraint::DbConstraint, db_foreignkey::DbForeignKey,
db_index::DbIndex, db_table::DbTable,
@ -19,14 +20,14 @@ pub struct SQLiteDatabase {
// I considered storing a SQLite connection here, but decided against it because
// 1) YAGNI, 2) it's not obvious how cloning a connection could work, 3) state
// management gets tricky quick. Revisit this approach if we find a compelling use case.
pub path: PathBuf,
pub connection: ConnectionDb,
pub statement: Option<Statement>,
}
impl SQLiteDatabase {
pub fn new(path: &Path) -> Self {
Self {
path: PathBuf::from(path),
connection: ConnectionDb::Path(PathBuf::from(path)),
statement: None,
}
}
@ -51,7 +52,7 @@ impl SQLiteDatabase {
match value {
Value::CustomValue { val, span } => match val.as_any().downcast_ref::<Self>() {
Some(db) => Ok(Self {
path: db.path.clone(),
connection: db.connection.clone(),
statement: db.statement.clone(),
}),
None => Err(ShellError::CantConvert(
@ -83,7 +84,7 @@ impl SQLiteDatabase {
}
pub fn query(&self, sql: &Spanned<String>, call_span: Span) -> Result<Value, ShellError> {
let db = open_sqlite_db(&self.path, call_span)?;
let db = open_sqlite_db(self.connection.as_path(call_span)?, call_span)?;
run_sql_query(db, sql).map_err(|e| {
ShellError::GenericError(
"Failed to query SQLite database".into(),
@ -112,7 +113,7 @@ impl SQLiteDatabase {
span: call_span,
};
let db = open_sqlite_db(&self.path, call_span)?;
let db = open_sqlite_db(self.connection.as_path(call_span)?, call_span)?;
run_sql_query(db, &sql).map_err(|e| {
ShellError::GenericError(
"Failed to query SQLite database".into(),
@ -127,7 +128,7 @@ impl SQLiteDatabase {
pub fn describe(&self, span: Span) -> Value {
let cols = vec!["connection".to_string(), "query".to_string()];
let connection = Value::String {
val: self.path.to_str().unwrap_or("").to_string(),
val: self.connection.to_string(),
span,
};
@ -146,7 +147,7 @@ impl SQLiteDatabase {
}
pub fn open_connection(&self) -> Result<Connection, rusqlite::Error> {
let conn = match Connection::open(self.path.to_string_lossy().to_string()) {
let conn = match Connection::open(self.connection.to_string()) {
Ok(conn) => conn,
Err(err) => return Err(err),
};
@ -350,7 +351,7 @@ impl SQLiteDatabase {
impl CustomValue for SQLiteDatabase {
fn clone_value(&self, span: Span) -> Value {
let cloned = SQLiteDatabase {
path: self.path.clone(),
connection: self.connection.clone(),
statement: self.statement.clone(),
};
@ -365,7 +366,7 @@ impl CustomValue for SQLiteDatabase {
}
fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
let db = open_sqlite_db(&self.path, span)?;
let db = open_sqlite_db(self.connection.as_path(span)?, span)?;
read_entire_sqlite_db(db, span).map_err(|e| {
ShellError::GenericError(
"Failed to read from SQLite database".into(),
@ -387,7 +388,7 @@ impl CustomValue for SQLiteDatabase {
}
fn follow_path_string(&self, _column_name: String, span: Span) -> Result<Value, ShellError> {
let db = open_sqlite_db(&self.path, span)?;
let db = open_sqlite_db(self.connection.as_path(span)?, span)?;
read_single_table(db, _column_name, span).map_err(|e| {
ShellError::GenericError(