Fix up ctrl+C handling in into_sqlite (#12130)

I noticed that ctrl+C handling wasn't fully wired up in `into sqlite`,
for some data types we were ignoring ctrl+C presses.

I fixed that up and also made sure we roll back the current transaction
when cancelling (without that, I think we leak memory and database
locks).
This commit is contained in:
Reilly Wood 2024-03-08 12:06:06 -08:00 committed by GitHub
parent 14d1c67863
commit 71ffd04ae7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -164,17 +164,23 @@ fn operate(
let file_name: Spanned<String> = call.req(engine_state, stack, 0)?;
let table_name: Option<Spanned<String>> = call.get_flag(engine_state, stack, "table-name")?;
let table = Table::new(&file_name, table_name)?;
let ctrl_c = engine_state.ctrlc.clone();
match action(input, table, span) {
match action(input, table, span, ctrl_c) {
Ok(val) => Ok(val.into_pipeline_data()),
Err(e) => Err(e),
}
}
fn action(input: PipelineData, table: Table, span: Span) -> Result<Value, ShellError> {
fn action(
input: PipelineData,
table: Table,
span: Span,
ctrl_c: Option<Arc<AtomicBool>>,
) -> Result<Value, ShellError> {
match input {
PipelineData::ListStream(list_stream, _) => {
insert_in_transaction(list_stream.stream, list_stream.ctrlc, span, table)
insert_in_transaction(list_stream.stream, span, table, ctrl_c)
}
PipelineData::Value(
Value::List {
@ -182,9 +188,9 @@ fn action(input: PipelineData, table: Table, span: Span) -> Result<Value, ShellE
internal_span,
},
_,
) => insert_in_transaction(vals.into_iter(), None, internal_span, table),
) => insert_in_transaction(vals.into_iter(), internal_span, table, ctrl_c),
PipelineData::Value(val, _) => {
insert_in_transaction(std::iter::once(val), None, span, table)
insert_in_transaction(std::iter::once(val), span, table, ctrl_c)
}
_ => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list".into(),
@ -197,9 +203,9 @@ fn action(input: PipelineData, table: Table, span: Span) -> Result<Value, ShellE
fn insert_in_transaction(
stream: impl Iterator<Item = Value>,
ctrlc: Option<Arc<AtomicBool>>,
span: Span,
mut table: Table,
ctrl_c: Option<Arc<AtomicBool>>,
) -> Result<Value, ShellError> {
let mut stream = stream.peekable();
let first_val = match stream.peek() {
@ -210,10 +216,16 @@ fn insert_in_transaction(
let table_name = table.name().clone();
let tx = table.try_init(first_val)?;
// insert all the records
stream.try_for_each(|stream_value| {
if let Some(ref ctrlc) = ctrlc {
for stream_value in stream {
if let Some(ref ctrlc) = ctrl_c {
if ctrlc.load(Ordering::Relaxed) {
tx.rollback().map_err(|e| ShellError::GenericError {
error: "Failed to rollback SQLite transaction".into(),
msg: e.to_string(),
span: None,
help: None,
inner: Vec::new(),
})?;
return Err(ShellError::InterruptedByUser { span: None });
}
}
@ -249,8 +261,8 @@ fn insert_in_transaction(
inner: Vec::new(),
})?;
result
})?;
result?
}
tx.commit().map_err(|e| ShellError::GenericError {
error: "Failed to commit SQLite transaction".into(),