perf: box MySqlConnection to reduce sizes of futures (#3265)

This commit is contained in:
Stepan Tubanov 2024-06-06 05:19:30 +04:00 committed by GitHub
parent 4d9f67b7b4
commit 5da0f73746
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 84 additions and 60 deletions

View file

@ -4,7 +4,7 @@ use futures_core::future::BoxFuture;
use crate::collation::{CharSet, Collation};
use crate::common::StatementCache;
use crate::connection::{tls, MySqlStream, MAX_PACKET_SIZE};
use crate::connection::{tls, MySqlConnectionInner, MySqlStream, MAX_PACKET_SIZE};
use crate::error::Error;
use crate::net::{Socket, WithSocket};
use crate::protocol::connect::{
@ -25,10 +25,12 @@ impl MySqlConnection {
let stream = handshake.await?;
Ok(Self {
stream,
transaction_depth: 0,
cache_statement: StatementCache::new(options.statement_cache_capacity),
log_settings: options.log_settings.clone(),
inner: Box::new(MySqlConnectionInner {
stream,
transaction_depth: 0,
cache_statement: StatementCache::new(options.statement_cache_capacity),
log_settings: options.log_settings.clone(),
}),
})
}
}

View file

@ -32,19 +32,22 @@ impl MySqlConnection {
// https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
// https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK
self.stream.send_packet(Prepare { query: sql }).await?;
self.inner
.stream
.send_packet(Prepare { query: sql })
.await?;
let ok: PrepareOk = self.stream.recv().await?;
let ok: PrepareOk = self.inner.stream.recv().await?;
// the parameter definitions are very unreliable so we skip over them
// as we have little use
if ok.params > 0 {
for _ in 0..ok.params {
let _def: ColumnDefinition = self.stream.recv().await?;
let _def: ColumnDefinition = self.inner.stream.recv().await?;
}
self.stream.maybe_recv_eof().await?;
self.inner.stream.maybe_recv_eof().await?;
}
// the column definitions are berefit the type information from the
@ -54,7 +57,7 @@ impl MySqlConnection {
let mut columns = Vec::new();
let column_names = if ok.columns > 0 {
recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await?
recv_result_metadata(&mut self.inner.stream, ok.columns as usize, &mut columns).await?
} else {
Default::default()
};
@ -73,7 +76,7 @@ impl MySqlConnection {
&mut self,
sql: &str,
) -> Result<(u32, MySqlStatementMetadata), Error> {
if let Some(statement) = self.cache_statement.get_mut(sql) {
if let Some(statement) = self.inner.cache_statement.get_mut(sql) {
// <MySqlStatementMetadata> is internally reference-counted
return Ok((*statement).clone());
}
@ -81,8 +84,15 @@ impl MySqlConnection {
let (id, metadata) = self.prepare_statement(sql).await?;
// in case of the cache being full, close the least recently used statement
if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
self.stream.send_packet(StmtClose { statement: id }).await?;
if let Some((id, _)) = self
.inner
.cache_statement
.insert(sql, (id, metadata.clone()))
{
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;
}
Ok((id, metadata))
@ -96,10 +106,10 @@ impl MySqlConnection {
persistent: bool,
) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
{
let mut logger = QueryLogger::new(sql, self.log_settings.clone());
let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());
self.stream.wait_until_ready().await?;
self.stream.waiting.push_back(Waiting::Result);
self.inner.stream.wait_until_ready().await?;
self.inner.stream.waiting.push_back(Waiting::Result);
Ok(Box::pin(try_stream! {
// make a slot for the shared column data
@ -108,13 +118,13 @@ impl MySqlConnection {
let mut columns = Arc::new(Vec::new());
let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
if persistent && self.cache_statement.is_enabled() {
if persistent && self.inner.cache_statement.is_enabled() {
let (id, metadata) = self
.get_or_prepare_statement(sql)
.await?;
// https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
self.stream
self.inner.stream
.send_packet(StatementExecute {
statement: id,
arguments: &arguments,
@ -128,20 +138,20 @@ impl MySqlConnection {
.await?;
// https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
self.stream
self.inner.stream
.send_packet(StatementExecute {
statement: id,
arguments: &arguments,
})
.await?;
self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner.stream.send_packet(StmtClose { statement: id }).await?;
(metadata.column_names, MySqlValueFormat::Binary, false)
}
} else {
// https://dev.mysql.com/doc/internals/en/com-query.html
self.stream.send_packet(Query(sql)).await?;
self.inner.stream.send_packet(Query(sql)).await?;
(Arc::default(), MySqlValueFormat::Text, true)
};
@ -149,7 +159,7 @@ impl MySqlConnection {
loop {
// query response is a meta-packet which may be one of:
// Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
let mut packet = self.stream.recv_packet().await?;
let mut packet = self.inner.stream.recv_packet().await?;
if packet[0] == 0x00 || packet[0] == 0xff {
// first packet in a query response is OK or ERR
@ -170,31 +180,31 @@ impl MySqlConnection {
continue;
}
self.stream.waiting.pop_front();
self.inner.stream.waiting.pop_front();
return Ok(());
}
// otherwise, this first packet is the start of the result-set metadata,
*self.stream.waiting.front_mut().unwrap() = Waiting::Row;
*self.inner.stream.waiting.front_mut().unwrap() = Waiting::Row;
let num_columns = packet.get_uint_lenenc() as usize; // column count
if needs_metadata {
column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?);
column_names = Arc::new(recv_result_metadata(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?);
} else {
// next time we hit here, it'll be a new result set and we'll need the
// full metadata
needs_metadata = true;
recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?;
recv_result_columns(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?;
}
// finally, there will be none or many result-rows
loop {
let packet = self.stream.recv_packet().await?;
let packet = self.inner.stream.recv_packet().await?;
if packet[0] == 0xfe && packet.len() < 9 {
let eof = packet.eof(self.stream.capabilities)?;
let eof = packet.eof(self.inner.stream.capabilities)?;
r#yield!(Either::Left(MySqlQueryResult {
rows_affected: 0,
@ -203,11 +213,11 @@ impl MySqlConnection {
if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
// more result sets exist, continue to the next one
*self.stream.waiting.front_mut().unwrap() = Waiting::Result;
*self.inner.stream.waiting.front_mut().unwrap() = Waiting::Result;
break;
}
self.stream.waiting.pop_front();
self.inner.stream.waiting.pop_front();
return Ok(());
}
@ -290,14 +300,17 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
'c: 'e,
{
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.inner.stream.wait_until_ready().await?;
let metadata = if self.cache_statement.is_enabled() {
let metadata = if self.inner.cache_statement.is_enabled() {
self.get_or_prepare_statement(sql).await?.1
} else {
let (id, metadata) = self.prepare_statement(sql).await?;
self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;
metadata
};
@ -316,11 +329,14 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
'c: 'e,
{
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.inner.stream.wait_until_ready().await?;
let (id, metadata) = self.prepare_statement(sql).await?;
self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;
let columns = (&*metadata.columns).clone();

View file

@ -23,6 +23,10 @@ const MAX_PACKET_SIZE: u32 = 1024;
/// A connection to a MySQL database.
pub struct MySqlConnection {
pub(crate) inner: Box<MySqlConnectionInner>,
}
pub(crate) struct MySqlConnectionInner {
// underlying TCP stream,
// wrapped in a potentially TLS stream,
// wrapped in a buffered stream
@ -50,8 +54,8 @@ impl Connection for MySqlConnection {
fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
self.stream.send_packet(Quit).await?;
self.stream.shutdown().await?;
self.inner.stream.send_packet(Quit).await?;
self.inner.stream.shutdown().await?;
Ok(())
})
@ -59,16 +63,16 @@ impl Connection for MySqlConnection {
fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
self.stream.shutdown().await?;
self.inner.stream.shutdown().await?;
Ok(())
})
}
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.stream.send_packet(Ping).await?;
self.stream.recv_ok().await?;
self.inner.stream.wait_until_ready().await?;
self.inner.stream.send_packet(Ping).await?;
self.inner.stream.recv_ok().await?;
Ok(())
})
@ -76,17 +80,18 @@ impl Connection for MySqlConnection {
#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.stream.wait_until_ready().boxed()
self.inner.stream.wait_until_ready().boxed()
}
fn cached_statements_size(&self) -> usize {
self.cache_statement.len()
self.inner.cache_statement.len()
}
fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
self.stream
while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
self.inner
.stream
.send_packet(StmtClose {
statement: statement_id,
})
@ -99,7 +104,7 @@ impl Connection for MySqlConnection {
#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.write_buffer().is_empty()
!self.inner.stream.write_buffer().is_empty()
}
fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
@ -110,6 +115,6 @@ impl Connection for MySqlConnection {
}
fn shrink_buffers(&mut self) {
self.stream.shrink_buffers();
self.inner.stream.shrink_buffers();
}
}

View file

@ -71,8 +71,8 @@ impl ConnectOptions for MySqlConnectOptions {
if self.set_names {
options.push(format!(
r#"NAMES {} COLLATE {}"#,
conn.stream.charset.as_str(),
conn.stream.collation.as_str()
conn.inner.stream.charset.as_str(),
conn.inner.stream.collation.as_str()
))
}

View file

@ -16,10 +16,10 @@ impl TransactionManager for MySqlTransactionManager {
fn begin(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;
conn.execute(&*begin_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth + 1;
conn.inner.transaction_depth = depth + 1;
Ok(())
})
@ -27,11 +27,11 @@ impl TransactionManager for MySqlTransactionManager {
fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;
if depth > 0 {
conn.execute(&*commit_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}
Ok(())
@ -40,11 +40,11 @@ impl TransactionManager for MySqlTransactionManager {
fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;
if depth > 0 {
conn.execute(&*rollback_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}
Ok(())
@ -52,15 +52,16 @@ impl TransactionManager for MySqlTransactionManager {
}
fn start_rollback(conn: &mut MySqlConnection) {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;
if depth > 0 {
conn.stream.waiting.push_back(Waiting::Result);
conn.stream.sequence_id = 0;
conn.stream
conn.inner.stream.waiting.push_back(Waiting::Result);
conn.inner.stream.sequence_id = 0;
conn.inner
.stream
.write_packet(Query(&*rollback_ansi_transaction_sql(depth)));
conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}
}
}