perf: implement pool benchmark, make fairness an option

This commit is contained in:
Austin Bonander 2020-07-01 02:38:33 -07:00 committed by Ryan Leckey
parent 3a8dc0e211
commit 17e88ac1a4
12 changed files with 375 additions and 38 deletions

193
Cargo.lock generated
View file

@ -311,6 +311,18 @@ dependencies = [
"waker-fn",
]
[[package]]
name = "bstr"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31accafdb70df7871592c058eca3985b71104e15ac32f64706022c58867da931"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde",
]
[[package]]
name = "bumpalo"
version = "3.4.0"
@ -367,6 +379,15 @@ dependencies = [
"serde_json",
]
[[package]]
name = "cast"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0"
dependencies = [
"rustc_version",
]
[[package]]
name = "cc"
version = "1.0.57"
@ -506,6 +527,42 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d375c433320f6c5057ae04a04376eef4d04ce2801448cf8863a78da99107be4"
[[package]]
name = "criterion"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70daa7ceec6cf143990669a04c7df13391d55fb27bd4079d252fca774ba244d8"
dependencies = [
"atty",
"cast",
"clap",
"criterion-plot",
"csv",
"itertools",
"lazy_static",
"num-traits",
"oorandom",
"plotters",
"rayon",
"regex",
"serde",
"serde_cbor",
"serde_derive",
"serde_json",
"tinytemplate",
"walkdir",
]
[[package]]
name = "criterion-plot"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e022feadec601fba1649cfa83586381a4ad31c6bf3a9ab7d408118b05dd9889d"
dependencies = [
"cast",
"itertools",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.2"
@ -584,6 +641,28 @@ dependencies = [
"subtle",
]
[[package]]
name = "csv"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279"
dependencies = [
"bstr",
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]]
name = "data-encoding"
version = "2.2.1"
@ -944,6 +1023,12 @@ dependencies = [
"tokio-io",
]
[[package]]
name = "half"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d36fab90f82edc3c747f9d438e06cf0a491055896f2a279638bb5beed6c40177"
[[package]]
name = "hashbrown"
version = "0.8.0"
@ -1510,6 +1595,12 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]]
name = "oorandom"
version = "11.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a170cebd8021a008ea92e4db85a72f80b35df514ec664b296fdcbb654eac0b2c"
[[package]]
name = "opaque-debug"
version = "0.2.3"
@ -1788,6 +1879,18 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
[[package]]
name = "plotters"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d1685fbe7beba33de0330629da9d955ac75bd54f33d7b79f9a895590124f6bb"
dependencies = [
"js-sys",
"num-traits",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "ppv-lite86"
version = "0.2.8"
@ -1907,6 +2010,31 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62f02856753d04e03e26929f820d0a0a337ebe71f849801eea335d464b349080"
dependencies = [
"autocfg 1.0.0",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280"
dependencies = [
"crossbeam-deque",
"crossbeam-queue",
"crossbeam-utils 0.7.2",
"lazy_static",
"num_cpus",
]
[[package]]
name = "redox_syscall"
version = "0.1.56"
@ -1925,6 +2053,15 @@ dependencies = [
"thread_local",
]
[[package]]
name = "regex-automata"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder",
]
[[package]]
name = "regex-syntax"
version = "0.6.18"
@ -2015,6 +2152,15 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.19"
@ -2097,6 +2243,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.114"
@ -2295,6 +2451,17 @@ dependencies = [
"trybuild",
]
[[package]]
name = "sqlx-bench"
version = "0.1.0"
dependencies = [
"criterion",
"dotenv",
"once_cell",
"sqlx",
"sqlx-rt",
]
[[package]]
name = "sqlx-cli"
version = "0.1.0-pre"
@ -2447,20 +2614,18 @@ dependencies = [
name = "sqlx-macros"
version = "0.4.0-pre"
dependencies = [
"async-std",
"dotenv",
"futures 0.3.5",
"heck",
"hex",
"once_cell",
"proc-macro2",
"quote",
"serde",
"serde_json",
"sha2 0.8.2",
"sqlx-core",
"sqlx-rt",
"syn",
"tokio 0.2.21",
"url 2.1.1",
]
@ -2473,6 +2638,7 @@ dependencies = [
"async-native-tls",
"async-std",
"native-tls",
"once_cell",
"tokio 0.2.21",
"tokio-native-tls",
]
@ -2794,6 +2960,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tinytemplate"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d3dc76004a03cec1c5932bca4cdc2e39aaa798e3f82363dd94f9adf6098c12f"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "tinyvec"
version = "0.3.3"
@ -3106,6 +3282,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9571542c2ce85ce642e6b58b3364da2fb53526360dfb7c211add4f5c23105ff7"
[[package]]
name = "walkdir"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d"
dependencies = [
"same-file",
"winapi 0.3.9",
"winapi-util",
]
[[package]]
name = "want"
version = "0.2.0"

View file

@ -6,6 +6,7 @@ members = [
"sqlx-macros",
"sqlx-test",
"sqlx-cli",
"sqlx-bench",
"examples/mysql/todos",
"examples/postgres/listen",
"examples/postgres/todos",

25
sqlx-bench/Cargo.toml Normal file
View file

@ -0,0 +1,25 @@
[package]
name = "sqlx-bench"
version = "0.1.0"
authors = ["Austin Bonander <austin@launchbadge.com>"]
edition = "2018"
publish = false
[features]
runtime-actix = ["sqlx/runtime-actix", "sqlx-rt/runtime-actix"]
runtime-async-std = ["sqlx/runtime-async-std", "sqlx-rt/runtime-async-std"]
runtime-tokio = ["sqlx/runtime-tokio", "sqlx-rt/runtime-tokio"]
postgres = ["sqlx/postgres"]
[dependencies]
criterion = "0.3.3"
dotenv = "0.15.0"
once_cell = "1.4"
sqlx = { version = "0.4.0-pre", path = "../", default-features = false }
sqlx-rt = { version = "0.1.0-pre", path = "../sqlx-rt", default-features = false }
[[bench]]
name = "pg_pool"
harness = false
required-features = ["postgres"]

View file

@ -0,0 +1,79 @@
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use sqlx::PgPool;
use std::time::{Duration, Instant};
fn bench_pgpool_acquire(c: &mut Criterion) {
let mut group = c.benchmark_group("bench_pgpool_acquire");
for &concurrent in [5u32, 10, 50, 100, 500, 1000, 5000 /*, 10_000, 50_000*/].iter() {
for &fair in [false, true].iter() {
let fairness = if fair { "(fair)" } else { "(unfair)" };
group.bench_with_input(
format!("{} concurrent {}", concurrent, fairness),
&(concurrent, fair),
|b, &(concurrent, fair)| do_bench_acquire(b, concurrent, fair),
);
}
}
group.finish();
}
fn do_bench_acquire(b: &mut Bencher, concurrent: u32, fair: bool) {
let pool = sqlx_rt::block_on(
PgPool::builder()
// we don't want timeouts because we want to see how the pool degrades
.connect_timeout(Duration::from_secs(3600))
// force the pool to start full
.min_size(50)
.max_size(50)
// we're not benchmarking `ping()`
.test_on_acquire(false)
.fair(fair)
.build(
&dotenv::var("DATABASE_URL").expect("DATABASE_URL must be set to run benchmarks"),
),
)
.expect("failed to open PgPool");
for _ in 0..concurrent {
let pool = pool.clone();
sqlx_rt::enter_runtime(|| {
sqlx_rt::spawn(async move {
while !pool.is_closed() {
let conn = match pool.acquire().await {
Ok(conn) => conn,
Err(sqlx::Error::PoolClosed) => break,
Err(e) => panic!("failed to acquire concurrent connection: {}", e),
};
// pretend we're using the connection
sqlx_rt::sleep(Duration::from_micros(500)).await;
drop(criterion::black_box(conn));
}
})
});
}
b.iter_custom(|iters| {
sqlx_rt::block_on(async {
// take the start time inside the future to make sure we only count once it's running
let start = Instant::now();
for _ in 0..iters {
criterion::black_box(
pool.acquire()
.await
.expect("failed to acquire connection for benchmark"),
);
}
start.elapsed()
})
});
sqlx_rt::block_on(pool.close());
}
criterion_group!(pg_pool, bench_pgpool_acquire);
criterion_main!(pg_pool);

View file

@ -47,10 +47,20 @@ impl<DB: Database> SharedPool<DB> {
pub(super) async fn close(&self) {
self.is_closed.store(true, Ordering::Release);
while self.idle_conns.pop().is_ok() {}
while let Ok(waker) = self.waiters.pop() {
waker.wake();
}
// ensure we wait until the pool is actually closed
while self.size() > 0 {
let _ = self
.idle_conns
.pop()
.map(|idle| Floating::from_idle(idle, self));
// yield to avoid starving the executor
sqlx_rt::yield_now().await;
}
}
#[inline]
@ -158,7 +168,7 @@ impl<DB: Database> SharedPool<DB> {
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> {
let start = Instant::now();
let deadline = start + self.options.connect_timeout;
let mut waited = false;
let mut waited = !self.options.fair;
// Unless the pool has been closed ...
while !self.is_closed() {

View file

@ -34,6 +34,7 @@ impl<DB: Database> Builder<DB> {
idle_timeout: None,
// If true, test the health of a connection on acquire
test_on_acquire: true,
fair: false,
},
}
}
@ -106,6 +107,11 @@ impl<DB: Database> Builder<DB> {
self
}
pub fn fair(mut self, fair: bool) -> Self {
self.options.fair = fair;
self
}
/// Consumes the builder, returning a new, initialized connection pool with the given
/// connection string.
///
@ -154,4 +160,5 @@ pub(crate) struct Options {
pub max_lifetime: Option<Duration>,
pub idle_timeout: Option<Duration>,
pub test_on_acquire: bool,
pub fair: bool,
}

View file

@ -19,9 +19,9 @@ proc-macro = true
default = [ "runtime-async-std" ]
# runtimes
runtime-async-std = [ "sqlx-core/runtime-async-std", "async-std" ]
runtime-tokio = [ "sqlx-core/runtime-tokio", "tokio", "once_cell" ]
runtime-actix = [ "sqlx-core/runtime-actix", "tokio", "once_cell" ]
runtime-async-std = [ "sqlx-core/runtime-async-std", "sqlx-rt/runtime-async-std" ]
runtime-tokio = [ "sqlx-core/runtime-tokio", "sqlx-rt/runtime-tokio" ]
runtime-actix = [ "sqlx-core/runtime-actix", "sqlx-rt/runtime-actix" ]
# offline building support
offline = ["sqlx-core/offline", "serde", "serde_json", "hex", "sha2"]
@ -41,18 +41,16 @@ uuid = [ "sqlx-core/uuid" ]
json = [ "sqlx-core/json", "serde_json" ]
[dependencies]
async-std = { version = "1.6.0", default-features = false, optional = true }
tokio = { version = "0.2.21", default-features = false, features = [ "rt-threaded" ], optional = true }
dotenv = { version = "0.15.0", default-features = false }
futures = { version = "0.3.4", default-features = false, features = [ "executor" ] }
hex = { version = "0.4.2", optional = true }
heck = "0.3.1"
proc-macro2 = { version = "1.0.9", default-features = false }
sqlx-core = { version = "0.4.0-pre", default-features = false, path = "../sqlx-core" }
sqlx-rt = { version = "0.1.0-pre", default-features = false, path = "../sqlx-rt" }
serde = { version = "1.0.111", optional = true }
serde_json = { version = "1.0.30", features = [ "preserve_order" ], optional = true }
sha2 = { version = "0.8.2", optional = true }
syn = { version = "1.0.30", default-features = false, features = [ "full" ] }
quote = { version = "1.0.6", default-features = false }
url = { version = "2.1.1", default-features = false }
once_cell = { version = "1.3", features = ["std"], optional = true }

View file

@ -15,7 +15,6 @@ type Result<T> = std::result::Result<T, Error>;
mod database;
mod derives;
mod query;
mod runtime;
fn macro_result(tokens: proc_macro2::TokenStream) -> TokenStream {
quote!(

View file

@ -10,11 +10,11 @@ use quote::{format_ident, quote};
use sqlx_core::connection::Connect;
use sqlx_core::database::Database;
use sqlx_core::describe::Describe;
use sqlx_rt::block_on;
use crate::database::DatabaseExt;
use crate::query::data::QueryData;
use crate::query::input::RecordType;
use crate::runtime::block_on;
mod args;
mod data;

View file

@ -1,23 +0,0 @@
// NOTE: this is separate from sqlx-rt because of the non-production nature of it
#[cfg(feature = "runtime-async-std")]
pub(crate) use async_std::task::block_on;
#[cfg(any(feature = "runtime-tokio", feature = "runtime-actix"))]
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use once_cell::sync::Lazy;
use tokio::runtime::{self, Runtime};
// lazily initialize a global runtime once for multiple invocations of the macros
static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
runtime::Builder::new()
// `.basic_scheduler()` requires calling `Runtime::block_on()` which needs mutability
.threaded_scheduler()
.enable_io()
.enable_time()
.build()
.expect("failed to initialize Tokio runtime")
});
RUNTIME.enter(|| futures::executor::block_on(future))
}

View file

@ -6,9 +6,9 @@ license = "MIT OR Apache-2.0"
edition = "2018"
[features]
runtime-actix = [ "actix-rt", "actix-threadpool", "tokio", "tokio-native-tls" ]
runtime-actix = [ "actix-rt", "actix-threadpool", "tokio", "tokio-native-tls", "once_cell" ]
runtime-async-std = [ "async-std", "async-native-tls" ]
runtime-tokio = [ "tokio", "tokio-native-tls" ]
runtime-tokio = [ "tokio", "tokio-native-tls", "once_cell" ]
[dependencies]
async-native-tls = { version = "0.3.3", optional = true }
@ -18,3 +18,4 @@ async-std = { version = "1.6.0", features = [ "unstable" ], optional = true }
tokio = { version = "0.2.21", optional = true, features = [ "blocking", "fs", "tcp", "uds", "macros", "rt-core", "rt-threaded", "time", "dns", "io-util" ] }
tokio-native-tls = { version = "0.1.0", optional = true }
native-tls = "0.2.4"
once_cell = { version = "1.3", features = ["std"], optional = true }

View file

@ -114,3 +114,56 @@ pub use async_std::os::unix::net::UnixStream;
#[cfg(all(feature = "async-native-tls", not(feature = "tokio-native-tls")))]
pub use async_native_tls::{Error as TlsError, TlsConnector, TlsStream};
#[cfg(all(
feature = "runtime-async-std",
not(any(feature = "runtime-actix", feature = "runtime-tokio"))
))]
pub(crate) use async_std::task::block_on;
#[cfg(all(
feature = "runtime-async-std",
not(any(feature = "runtime-actix", feature = "runtime-tokio"))
))]
pub fn enter_runtime<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
// no-op for async-std
f()
}
#[cfg(all(
any(feature = "runtime-tokio", feature = "runtime-actix"),
not(feature = "runtime-async-std")
))]
pub use tokio_runtime::{block_on, enter_runtime};
#[cfg(any(feature = "runtime-tokio", feature = "runtime-actix"))]
mod tokio_runtime {
use once_cell::sync::Lazy;
use tokio::runtime::{self, Runtime};
// lazily initialize a global runtime once for multiple invocations of the macros
static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
runtime::Builder::new()
// `.basic_scheduler()` requires calling `Runtime::block_on()` which needs mutability
.threaded_scheduler()
.enable_io()
.enable_time()
.build()
.expect("failed to initialize Tokio runtime")
});
#[cfg(any(feature = "runtime-tokio", feature = "runtime-actix"))]
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
RUNTIME.enter(|| RUNTIME.handle().block_on(future))
}
pub fn enter_runtime<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
RUNTIME.enter(f)
}
}