federation tests: ensure server stop after test and random activity id

This commit is contained in:
phiresky 2024-06-29 19:21:15 +02:00
parent e3fef895a1
commit 76c6487390
4 changed files with 71 additions and 18 deletions

23
Cargo.lock generated
View file

@ -3129,11 +3129,13 @@ dependencies = [
"reqwest 0.11.27",
"serde_json",
"serial_test",
"test-context",
"tokio",
"tokio-util",
"tracing",
"tracing-test",
"url",
"uuid",
]
[[package]]
@ -5716,6 +5718,27 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "test-context"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6676ab8513edfd2601a108621103fdb45cac9098305ca25ec93f7023b06b05d9"
dependencies = [
"futures",
"test-context-macros",
]
[[package]]
name = "test-context-macros"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ea17a2dc368aeca6f554343ced1b1e31f76d63683fa8016e5844bd7a5144a1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "thiserror"
version = "1.0.61"

View file

@ -40,3 +40,5 @@ serial_test = { workspace = true }
url.workspace = true
actix-web.workspace = true
tracing-test = "0.2.5"
uuid.workspace = true
test-context = "0.3.0"

View file

@ -28,7 +28,8 @@ use tokio_util::sync::CancellationToken;
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the
/// federation queue.
/// federation queue. This is intentionally a separate flag from other flags like debug_assertions,
/// since this is a invasive change we only need rarely.
pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty())

View file

@ -405,7 +405,7 @@ mod test {
http_signatures::generate_actor_keypair,
protocol::context::WithContext,
};
use actix_web::{web, App, HttpResponse, HttpServer};
use actix_web::{dev::ServerHandle, web, App, HttpResponse, HttpServer};
use lemmy_api_common::utils::{generate_inbox_url, generate_shared_inbox_url};
use lemmy_db_schema::{
newtypes::DbUrl,
@ -420,10 +420,12 @@ mod test {
use serde_json::Value;
use serial_test::serial;
use std::{fs::File, io::BufReader};
use test_context::{test_context, AsyncTestContext};
use tokio::{
select,
spawn,
sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver},
task::JoinHandle,
};
use tracing_test::traced_test;
use url::Url;
@ -435,6 +437,8 @@ mod test {
stats_receiver: UnboundedReceiver<FederationQueueStateWithDomain>,
inbox_receiver: UnboundedReceiver<String>,
cancel: CancellationToken,
cleaned_up: bool,
wait_stop_server: ServerHandle,
}
impl Data {
@ -459,7 +463,7 @@ mod test {
// listen for received activities in background
let cancel_ = cancel.clone();
listen_activities(inbox_sender, cancel_).await?;
let wait_stop_server = listen_activities(inbox_sender, cancel_)?;
let fed_config = FederationWorkerConfig {
concurrent_sends_per_instance: 1,
@ -481,30 +485,48 @@ mod test {
stats_receiver,
inbox_receiver,
cancel,
wait_stop_server,
cleaned_up: false,
})
}
async fn cleanup(&self) -> LemmyResult<()> {
async fn cleanup(&mut self) -> LemmyResult<()> {
if self.cleaned_up {
return Ok(());
}
self.cleaned_up = true;
self.cancel.cancel();
sleep(*WORK_FINISHED_RECHECK_DELAY).await;
Instance::delete_all(&mut self.context.pool()).await?;
Person::delete(&mut self.context.pool(), self.person.id).await?;
self.wait_stop_server.stop(true).await;
Ok(())
}
}
/// In order to guarantee that the webserver is stopped via the cleanup function,
/// we implement a test context.
impl AsyncTestContext for Data {
async fn setup() -> Data {
Data::init().await.unwrap()
}
async fn teardown(mut self) {
self.cleanup().await.unwrap()
}
}
#[test_context(Data)]
#[tokio::test]
#[traced_test]
#[serial]
async fn test_stats() -> LemmyResult<()> {
let mut data = Data::init().await?;
async fn test_stats(data: &mut Data) -> LemmyResult<()> {
tracing::debug!("hello world");
// first receive at startup
let rcv = data.stats_receiver.recv().await.unwrap();
tracing::debug!("received first stats");
assert_eq!(data.instance.id, rcv.state.instance_id);
assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
// assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
// let last_id_before = rcv.state.last_successful_id.unwrap();
let sent = send_activity(data.person.actor_id.clone(), &data.context).await?;
tracing::debug!("sent activity");
@ -528,16 +550,15 @@ mod test {
let rcv = data.stats_receiver.try_recv();
assert_eq!(Some(TryRecvError::Disconnected), rcv.err());
let inbox_rcv = data.inbox_receiver.try_recv();
assert_eq!(Some(TryRecvError::Empty), inbox_rcv.err());
assert_eq!(Some(TryRecvError::Disconnected), inbox_rcv.err());
Ok(())
}
#[test_context(Data)]
#[tokio::test]
#[serial]
async fn test_update_instance() -> LemmyResult<()> {
let mut data = Data::init().await?;
async fn test_update_instance(data: &mut Data) -> LemmyResult<()> {
let form = InstanceForm::builder()
.domain(data.instance.domain.clone())
.updated(None)
@ -557,10 +578,10 @@ mod test {
Ok(())
}
async fn listen_activities(
fn listen_activities(
inbox_sender: UnboundedSender<String>,
cancel: CancellationToken,
) -> LemmyResult<()> {
) -> LemmyResult<ServerHandle> {
let run = HttpServer::new(move || {
App::new()
.app_data(actix_web::web::Data::new(inbox_sender.clone()))
@ -577,13 +598,15 @@ mod test {
})
.bind(("127.0.0.1", 8085))?
.run();
let handle = run.handle();
tokio::spawn(async move {
select! {
run.await.unwrap();
/*select! {
_ = run => {},
_ = cancel.cancelled() => {}
}
_ = cancel.cancelled() => { }
}*/
});
Ok(())
Ok(handle)
}
async fn send_activity(actor_id: DbUrl, context: &LemmyContext) -> LemmyResult<SentActivity> {
@ -591,7 +614,11 @@ mod test {
let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?;
let reader = BufReader::new(file);
let form = SentActivityForm {
ap_id: Url::parse("http://local.com/activity/1")?.into(),
ap_id: Url::parse(&format!(
"http://local.com/activity/{}",
uuid::Uuid::new_v4().to_string()
))?
.into(),
data: serde_json::from_reader(reader)?,
sensitive: false,
send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],