different data every time so activities are distinguishable

This commit is contained in:
phiresky 2024-07-11 00:54:38 +02:00
parent a40fea3e76
commit dd9c89e4ef

View file

@ -41,6 +41,12 @@ static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(0);
/// Maximum number of successful sends to allow out of order
const MAX_SUCCESSFULS: usize = 1000;
/// in prod mode, try to collect multiple send results at the same time to reduce load
#[cfg(not(test))]
static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 4;
#[cfg(test)]
static MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE: usize = 0;
pub(crate) struct InstanceWorker {
instance: Instance,
stop: CancellationToken,
@ -104,7 +110,7 @@ impl InstanceWorker {
let need_wait_for_event = (in_flight != 0 && self.state.fail_count > 0)
|| successfuls.len() >= MAX_SUCCESSFULS
|| in_flight >= self.federation_worker_config.concurrent_sends_per_instance;
if need_wait_for_event || receive_send_result.len() > 4 {
if need_wait_for_event || receive_send_result.len() > MIN_ACTIVITY_SEND_RESULTS_TO_HANDLE {
// if len() > 0 then this does not block and allows us to write to db more often
// if len is 0 then this means we wait for something to change our above conditions,
// which can only happen by an event sent into the channel
@ -423,7 +429,7 @@ mod test {
};
use lemmy_utils::error::LemmyResult;
use reqwest::StatusCode;
use serde_json::Value;
use serde_json::{json, Value};
use serial_test::serial;
use std::{fs::File, io::BufReader};
use test_context::{test_context, AsyncTestContext};
@ -474,7 +480,7 @@ mod test {
.unwrap_or(1);
let fed_config = FederationWorkerConfig {
concurrent_sends_per_instance
concurrent_sends_per_instance,
};
spawn(InstanceWorker::init_and_loop(
instance.clone(),
@ -548,6 +554,7 @@ mod test {
let rcv = data.stats_receiver.recv().await.unwrap();
assert_eq!(data.instance.id, rcv.state.instance_id);
assert_eq!(Some(sent.id), rcv.state.last_successful_id);
tracing::debug!("received second stats");
data.cleanup().await?;
@ -594,6 +601,42 @@ mod test {
Ok(())
}
#[test_context(Data)]
#[tokio::test]
#[traced_test]
#[serial]
/// this test sends 15 activities, waits and checks they have all been received, then sends 50,
/// etc
async fn test_send_15_50_200(data: &mut Data) -> LemmyResult<()> {
tracing::debug!("hello world");
// first receive at startup
let rcv = data.stats_receiver.recv().await.unwrap();
tracing::debug!("received first stats");
assert_eq!(data.instance.id, rcv.state.instance_id);
// assert_eq!(Some(ActivityId(0)), rcv.state.last_successful_id);
// let last_id_before = rcv.state.last_successful_id.unwrap();
let counts = vec![15, 50, 200];
for count in counts {
tracing::debug!("sending {} activities", count);
let mut sent = Vec::new();
for _ in 0..count {
sent.push(send_activity(data.person.actor_id.clone(), &data.context, false).await?);
}
sleep(2 * *WORK_FINISHED_RECHECK_DELAY).await;
tracing::debug!("sent activity");
// receive for successfully sent activity
for i in 0..count {
let inbox_rcv = data.inbox_receiver.recv().await.unwrap();
let parsed_activity = serde_json::from_str::<WithContext<Value>>(&inbox_rcv)?;
assert_eq!(&sent[i].data, parsed_activity.inner());
tracing::debug!("received activity");
}
}
Ok(())
}
#[test_context(Data)]
#[tokio::test]
#[serial]
@ -651,15 +694,20 @@ mod test {
wait: bool,
) -> LemmyResult<SentActivity> {
// create outgoing activity
let file = File::open("../apub/assets/lemmy/activities/voting/like_note.json")?;
let reader = BufReader::new(file);
let data = json!({
"actor": "http://ds9.lemmy.ml/u/lemmy_alpha",
"object": "http://ds9.lemmy.ml/comment/1",
"audience": "https://enterprise.lemmy.ml/c/tenforward",
"type": "Like",
"id": format!("http://ds9.lemmy.ml/activities/like/{}", uuid::Uuid::new_v4()),
});
let form = SentActivityForm {
ap_id: Url::parse(&format!(
"http://local.com/activity/{}",
uuid::Uuid::new_v4()
))?
.into(),
data: serde_json::from_reader(reader)?,
data,
sensitive: false,
send_inboxes: vec![Some(Url::parse("http://localhost:8085/inbox")?.into())],
send_all_instances: false,