From 375d9a2a3cf9c023ac6ac832d98f1b456fb9e2ce Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 9 Sep 2023 18:25:03 +0200 Subject: [PATCH] Persistent, performant, reliable federation queue (#3605) * persistent activity queue * fixes * fixes * make federation workers function callable from outside * log federation instances * dead instance detection not needed here * taplo fmt * split federate bin/lib * minor fix * better logging * log * create struct to hold cancellable task for readability * use boxfuture for readability * reset submodule * fix * fix lint * swap * remove json column, use separate array columns instead * some review comments * make worker a struct for readability * minor readability * add local filter to community follower view * remove separate lemmy_federate entry point * fix remaining duration * address review comments mostly * fix lint * upgrade actitypub-fed to simpler interface * fix sql format * increase delays a bit * fixes after merge * remove selectable * fix instance selectable * add comment * start federation based on latest id at the time * rename federate process args * dead instances in one query * filter follow+report activities by local * remove synchronous federation remove activity sender queue * lint * fix federation tests by waiting for results to change * fix fed test * fix comment report * wait some more * Apply suggestions from code review Co-authored-by: SorteKanin * fix most remaining tests * wait until private messages * fix community tests * fix community tests * move arg parse * use instance_id instead of domain in federation_queue_state table --------- Co-authored-by: Dessalines Co-authored-by: SorteKanin --- Cargo.lock | 190 +++++++---- Cargo.toml | 7 +- api_tests/prepare-drone-federation-test.sh | 4 +- api_tests/run-federation-test.sh | 1 - api_tests/src/comment.spec.ts | 145 ++++++-- api_tests/src/community.spec.ts | 41 ++- api_tests/src/follow.spec.ts | 8 +- api_tests/src/post.spec.ts | 76 ++++- api_tests/src/private_message.spec.ts | 31 +- api_tests/src/shared.ts | 26 +- api_tests/tsconfig.json | 2 +- crates/api_common/src/send_activity.rs | 59 +--- crates/api_crud/src/post/create.rs | 7 +- .../apub/src/activities/block/block_user.rs | 7 +- .../src/activities/block/undo_block_user.rs | 7 +- .../apub/src/activities/community/announce.rs | 3 +- .../activities/community/collection_add.rs | 13 +- .../activities/community/collection_remove.rs | 13 +- .../src/activities/community/lock_page.rs | 11 +- crates/apub/src/activities/community/mod.rs | 11 +- .../apub/src/activities/community/report.rs | 8 +- .../apub/src/activities/community/update.rs | 12 +- .../activities/create_or_update/comment.rs | 5 +- .../src/activities/create_or_update/post.rs | 3 +- .../create_or_update/private_message.rs | 3 +- .../src/activities/deletion/delete_user.rs | 7 +- crates/apub/src/activities/deletion/mod.rs | 9 +- .../apub/src/activities/following/accept.rs | 7 +- .../apub/src/activities/following/follow.rs | 7 +- .../src/activities/following/undo_follow.rs | 7 +- crates/apub/src/activities/mod.rs | 61 ++-- crates/apub/src/activities/voting/mod.rs | 6 +- crates/apub/src/fetcher/mod.rs | 1 + .../src/fetcher/site_or_community_or_user.rs | 108 ++++++ crates/apub/src/fetcher/user_or_community.rs | 11 + crates/apub/src/lib.rs | 2 +- crates/apub/src/objects/community.rs | 8 + crates/apub/src/objects/instance.rs | 19 +- crates/apub/src/objects/person.rs | 12 +- crates/db_schema/src/impls/activity.rs | 14 +- crates/db_schema/src/impls/instance.rs | 51 ++- crates/db_schema/src/impls/site.rs | 32 +- crates/db_schema/src/newtypes.rs | 8 +- crates/db_schema/src/schema.rs | 24 ++ crates/db_schema/src/source/activity.rs | 71 +++- crates/db_schema/src/source/instance.rs | 2 +- crates/db_schema/src/utils.rs | 3 + crates/db_views_actor/Cargo.toml | 1 + .../src/community_follower_view.rs | 36 +- crates/federate/Cargo.toml | 41 +++ crates/federate/src/federation_queue_state.rs | 63 ++++ crates/federate/src/lib.rs | 207 ++++++++++++ crates/federate/src/util.rs | 198 +++++++++++ crates/federate/src/worker.rs | 312 ++++++++++++++++++ crates/utils/src/error.rs | 5 + crates/utils/src/lib.rs | 11 - .../down.sql | 13 + .../up.sql | 32 ++ src/lib.rs | 158 +++++++-- src/main.rs | 9 +- src/root_span_builder.rs | 6 +- 61 files changed, 1878 insertions(+), 377 deletions(-) create mode 100644 crates/apub/src/fetcher/site_or_community_or_user.rs create mode 100644 crates/federate/Cargo.toml create mode 100644 crates/federate/src/federation_queue_state.rs create mode 100644 crates/federate/src/lib.rs create mode 100644 crates/federate/src/util.rs create mode 100644 crates/federate/src/worker.rs create mode 100644 migrations/2023-08-01-115243_persistent-activity-queue/down.sql create mode 100644 migrations/2023-08-01-115243_persistent-activity-queue/up.sql diff --git a/Cargo.lock b/Cargo.lock index 78732bee2..bb72acb69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,9 +10,9 @@ checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" [[package]] name = "activitypub_federation" -version = "0.5.0-beta.2" +version = "0.5.0-beta.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8210e0ac4675753f9288c1102fb4940b22e5868308383c286b07eb63f3ff4c65" +checksum = "509cbafa1b42e01b7ca76c26298814a6638825df4fd67aef2f4c9d36a39c2b6d" dependencies = [ "activitystreams-kinds", "actix-web", @@ -24,12 +24,14 @@ dependencies = [ "derive_builder", "dyn-clone", "enum_delegate", + "futures", "futures-core", "http", "http-signature-normalization", "http-signature-normalization-reqwest", "httpdate", "itertools 0.10.5", + "moka", "once_cell", "openssl", "pin-project-lite", @@ -401,6 +403,54 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea" + +[[package]] +name = "anstyle-parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +dependencies = [ + "windows-sys 0.48.0", +] + +[[package]] +name = "anstyle-wincon" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +dependencies = [ + "anstyle", + "windows-sys 0.48.0", +] + [[package]] name = "anyhow" version = "1.0.71" @@ -898,40 +948,44 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.32" +version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39" +checksum = "1d5f1946157a96594eb2d2c10eb7ad9a2b27518cb3000209dec700c35df9197d" dependencies = [ - "bitflags 1.3.2", + "clap_builder", "clap_derive", - "clap_lex", - "is-terminal", "once_cell", +] + +[[package]] +name = "clap_builder" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78116e32a042dd73c2901f0dc30790d20ff3447f3e3472fad359e8c3d282bcd6" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", "strsim", - "termcolor", ] [[package]] name = "clap_derive" -version = "4.0.21" +version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014" +checksum = "c9fd1a5729c4548118d7d70ff234a44868d00489a4b6597b0b020918a0e91a1a" dependencies = [ "heck", - "proc-macro-error", "proc-macro2", "quote", - "syn 1.0.103", + "syn 2.0.31", ] [[package]] name = "clap_lex" -version = "0.3.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" -dependencies = [ - "os_str_bytes", -] +checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" [[package]] name = "clokwerk" @@ -985,6 +1039,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "combine" version = "4.6.6" @@ -2111,15 +2171,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hermit-abi" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.3.2" @@ -2454,18 +2505,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" -[[package]] -name = "is-terminal" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189" -dependencies = [ - "hermit-abi 0.2.6", - "io-lifetimes", - "rustix 0.36.5", - "windows-sys 0.42.0", -] - [[package]] name = "itertools" version = "0.10.5" @@ -2742,6 +2781,7 @@ dependencies = [ name = "lemmy_db_views_actor" version = "0.18.1" dependencies = [ + "chrono", "diesel", "diesel-async", "lemmy_db_schema", @@ -2762,6 +2802,38 @@ dependencies = [ "ts-rs", ] +[[package]] +name = "lemmy_federate" +version = "0.18.1" +dependencies = [ + "activitypub_federation", + "anyhow", + "async-trait", + "bytes", + "chrono", + "diesel", + "diesel-async", + "enum_delegate", + "futures", + "lemmy_api_common", + "lemmy_apub", + "lemmy_db_schema", + "lemmy_db_views_actor", + "lemmy_utils", + "moka", + "once_cell", + "openssl", + "reqwest", + "reqwest-middleware", + "reqwest-tracing", + "serde", + "serde_json", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lemmy_routes" version = "0.18.1" @@ -2796,6 +2868,7 @@ dependencies = [ "actix-web", "actix-web-prom", "chrono", + "clap", "clokwerk", "console-subscriber", "diesel", @@ -2807,6 +2880,7 @@ dependencies = [ "lemmy_api_crud", "lemmy_apub", "lemmy_db_schema", + "lemmy_federate", "lemmy_routes", "lemmy_utils", "opentelemetry 0.19.0", @@ -3498,12 +3572,6 @@ dependencies = [ "hashbrown 0.12.3", ] -[[package]] -name = "os_str_bytes" -version = "6.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" - [[package]] name = "overload" version = "0.1.1" @@ -3881,30 +3949,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.103", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro2" version = "1.0.64" @@ -5222,9 +5266,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", @@ -5710,6 +5754,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5190c9442dcdaf0ddd50f37420417d219ae5261bbf5db120d0f9bab996c9cba1" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index a9efec603..72ab87d4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ members = [ "crates/db_views_actor", "crates/db_views_actor", "crates/routes", + "crates/federate", ] [workspace.dependencies] @@ -67,7 +68,7 @@ lemmy_routes = { version = "=0.18.1", path = "./crates/routes" } lemmy_db_views = { version = "=0.18.1", path = "./crates/db_views" } lemmy_db_views_actor = { version = "=0.18.1", path = "./crates/db_views_actor" } lemmy_db_views_moderator = { version = "=0.18.1", path = "./crates/db_views_moderator" } -activitypub_federation = { version = "0.5.0-beta.2", default-features = false, features = [ +activitypub_federation = { version = "0.5.0-beta.3", default-features = false, features = [ "actix-web", ] } diesel = "2.1.0" @@ -88,7 +89,6 @@ tracing-error = "0.2.0" tracing-log = "0.1.3" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } url = { version = "2.4.0", features = ["serde"] } -url_serde = "0.2.0" reqwest = { version = "0.11.18", features = ["json", "blocking", "gzip"] } reqwest-middleware = "0.2.2" reqwest-tracing = "0.4.5" @@ -119,7 +119,6 @@ futures = "0.3.28" http = "0.2.9" percent-encoding = "2.3.0" rosetta-i18n = "0.1.3" -rand = "0.8.5" opentelemetry = { version = "0.19.0", features = ["rt-tokio"] } tracing-opentelemetry = { version = "0.19.0" } ts-rs = { version = "7.0.0", features = ["serde-compat", "chrono-impl"] } @@ -167,3 +166,5 @@ tokio-postgres-rustls = { workspace = true } chrono = { workspace = true } prometheus = { version = "0.13.3", features = ["process"], optional = true } actix-web-prom = { version = "0.6.0", optional = true } +clap = { version = "4.3.19", features = ["derive"] } +lemmy_federate = { version = "0.18.1", path = "crates/federate" } diff --git a/api_tests/prepare-drone-federation-test.sh b/api_tests/prepare-drone-federation-test.sh index ef1133287..ba6dd324b 100755 --- a/api_tests/prepare-drone-federation-test.sh +++ b/api_tests/prepare-drone-federation-test.sh @@ -4,7 +4,7 @@ set -e export RUST_BACKTRACE=1 -export RUST_LOG="warn,lemmy_server=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" +export RUST_LOG="warn,lemmy_server=debug,lemmy_federate=debug,lemmy_api=debug,lemmy_api_common=debug,lemmy_api_crud=debug,lemmy_apub=debug,lemmy_db_schema=debug,lemmy_db_views=debug,lemmy_db_views_actor=debug,lemmy_db_views_moderator=debug,lemmy_routes=debug,lemmy_utils=debug,lemmy_websocket=debug" for INSTANCE in lemmy_alpha lemmy_beta lemmy_gamma lemmy_delta lemmy_epsilon; do echo "DB URL: ${LEMMY_DATABASE_URL} INSTANCE: $INSTANCE" @@ -26,7 +26,7 @@ if [ -z "$DO_WRITE_HOSTS_FILE" ]; then fi else for INSTANCE in lemmy-alpha lemmy-beta lemmy-gamma lemmy-delta lemmy-epsilon; do - echo "127.0.0.1 $INSTANCE" >> /etc/hosts + echo "127.0.0.1 $INSTANCE" >>/etc/hosts done fi diff --git a/api_tests/run-federation-test.sh b/api_tests/run-federation-test.sh index ff74744a1..3042fd344 100755 --- a/api_tests/run-federation-test.sh +++ b/api_tests/run-federation-test.sh @@ -2,7 +2,6 @@ set -e export LEMMY_DATABASE_URL=postgres://lemmy:password@localhost:5432 -export LEMMY_SYNCHRONOUS_FEDERATION=1 # currently this is true in debug by default, but still. pushd .. cargo build rm target/lemmy_server || true diff --git a/api_tests/src/comment.spec.ts b/api_tests/src/comment.spec.ts index 246497617..000c0b0ab 100644 --- a/api_tests/src/comment.spec.ts +++ b/api_tests/src/comment.spec.ts @@ -32,6 +32,8 @@ import { getPersonDetails, getReplies, getUnreadCount, + waitUntil, + delay, } from "./shared"; import { CommentView } from "lemmy-js-client/dist/types/CommentView"; @@ -42,6 +44,8 @@ beforeAll(async () => { await unfollows(); await followBeta(alpha); await followBeta(gamma); + // wait for FOLLOW_ADDITIONS_RECHECK_DELAY + await delay(2000); let betaCommunity = (await resolveBetaCommunity(alpha)).community; if (betaCommunity) { postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); @@ -75,7 +79,10 @@ test("Create a comment", async () => { // Make sure that comment is liked on beta let betaComment = ( - await resolveComment(beta, commentRes.comment_view.comment) + await waitUntil( + () => resolveComment(beta, commentRes.comment_view.comment), + c => c.comment?.counts.score === 1, + ) ).comment; expect(betaComment).toBeDefined(); expect(betaComment?.community.local).toBe(true); @@ -108,7 +115,11 @@ test("Update a comment", async () => { // Make sure that post is updated on beta let betaCommentUpdated = ( - await resolveComment(beta, commentRes.comment_view.comment) + await waitUntil( + () => resolveComment(beta, commentRes.comment_view.comment), + c => + c.comment?.comment.content === "A jest test federated comment update", + ) ).comment; assertCommentFederation(betaCommentUpdated, updateCommentRes.comment_view); }); @@ -121,16 +132,18 @@ test("Delete a comment", async () => { let betaComment = ( await resolveComment(beta, commentRes.comment_view.comment) ).comment; - if (!betaComment) { throw "Missing beta comment before delete"; } // Find the comment on remote instance gamma let gammaComment = ( - await resolveComment(gamma, commentRes.comment_view.comment) + await waitUntil( + () => + resolveComment(gamma, commentRes.comment_view.comment).catch(e => e), + r => r !== "couldnt_find_object", + ) ).comment; - if (!gammaComment) { throw "Missing gamma comment (remote-home-remote replication) before delete"; } @@ -143,14 +156,16 @@ test("Delete a comment", async () => { expect(deleteCommentRes.comment_view.comment.deleted).toBe(true); // Make sure that comment is undefined on beta - await expect( - resolveComment(beta, commentRes.comment_view.comment), - ).rejects.toBe("couldnt_find_object"); + await waitUntil( + () => resolveComment(beta, commentRes.comment_view.comment).catch(e => e), + e => e === "couldnt_find_object", + ); // Make sure that comment is undefined on gamma after delete - await expect( - resolveComment(gamma, commentRes.comment_view.comment), - ).rejects.toBe("couldnt_find_object"); + await waitUntil( + () => resolveComment(gamma, commentRes.comment_view.comment).catch(e => e), + e => e === "couldnt_find_object", + ); // Test undeleting the comment let undeleteCommentRes = await deleteComment( @@ -162,7 +177,10 @@ test("Delete a comment", async () => { // Make sure that comment is undeleted on beta let betaComment2 = ( - await resolveComment(beta, commentRes.comment_view.comment) + await waitUntil( + () => resolveComment(beta, commentRes.comment_view.comment).catch(e => e), + e => e !== "couldnt_find_object", + ) ).comment; expect(betaComment2?.comment.deleted).toBe(false); assertCommentFederation(betaComment2, undeleteCommentRes.comment_view); @@ -257,8 +275,12 @@ test("Unlike a comment", async () => { // Lemmy automatically creates 1 like (vote) by author of comment. // Make sure that comment is liked (voted up) on gamma, downstream peer // This is testing replication from remote-home-remote (alpha-beta-gamma) + let gammaComment1 = ( - await resolveComment(gamma, commentRes.comment_view.comment) + await waitUntil( + () => resolveComment(gamma, commentRes.comment_view.comment), + c => c.comment?.counts.score === 1, + ) ).comment; expect(gammaComment1).toBeDefined(); expect(gammaComment1?.community.local).toBe(false); @@ -270,7 +292,10 @@ test("Unlike a comment", async () => { // Make sure that comment is unliked on beta let betaComment = ( - await resolveComment(beta, commentRes.comment_view.comment) + await waitUntil( + () => resolveComment(beta, commentRes.comment_view.comment), + c => c.comment?.counts.score === 0, + ) ).comment; expect(betaComment).toBeDefined(); expect(betaComment?.community.local).toBe(true); @@ -280,7 +305,10 @@ test("Unlike a comment", async () => { // Make sure that comment is unliked on gamma, downstream peer // This is testing replication from remote-home-remote (alpha-beta-gamma) let gammaComment = ( - await resolveComment(gamma, commentRes.comment_view.comment) + await waitUntil( + () => resolveComment(gamma, commentRes.comment_view.comment), + c => c.comment?.counts.score === 0, + ) ).comment; expect(gammaComment).toBeDefined(); expect(gammaComment?.community.local).toBe(false); @@ -290,7 +318,10 @@ test("Unlike a comment", async () => { test("Federated comment like", async () => { let commentRes = await createComment(alpha, postOnAlphaRes.post_view.post.id); - + await waitUntil( + () => resolveComment(beta, commentRes.comment_view.comment), + c => c.comment?.counts.score === 1, + ); // Find the comment on beta let betaComment = ( await resolveComment(beta, commentRes.comment_view.comment) @@ -304,11 +335,20 @@ test("Federated comment like", async () => { expect(like.comment_view.counts.score).toBe(2); // Get the post from alpha, check the likes - let postComments = await getComments(alpha, postOnAlphaRes.post_view.post.id); + let postComments = await waitUntil( + () => getComments(alpha, postOnAlphaRes.post_view.post.id), + c => c.comments[0].counts.score === 2, + ); expect(postComments.comments[0].counts.score).toBe(2); }); test("Reply to a comment from another instance, get notification", async () => { + let betaCommunity = (await resolveBetaCommunity(alpha)).community; + if (!betaCommunity) { + throw "Missing beta community"; + } + const postOnAlphaRes = await createPost(alpha, betaCommunity.community.id); + // Create a root-level trunk-branch comment on alpha let commentRes = await createComment(alpha, postOnAlphaRes.post_view.post.id); // find that comment id on beta @@ -338,11 +378,15 @@ test("Reply to a comment from another instance, get notification", async () => { // TODO not sure why, but a searchComment back to alpha, for the ap_id of betas // comment, isn't working. // let searchAlpha = await searchComment(alpha, replyRes.comment); - let postComments = await getComments(alpha, postOnAlphaRes.post_view.post.id); - // Note: in Lemmy 0.18.3 pre-release this is coming up 7 + let postComments = await waitUntil( + () => getComments(alpha, postOnAlphaRes.post_view.post.id), + pc => pc.comments.length >= 2, + ); + // Note: this test fails when run twice and this count will differ expect(postComments.comments.length).toBeGreaterThanOrEqual(2); let alphaComment = postComments.comments[0]; expect(alphaComment.comment.content).toBeDefined(); + expect(getCommentParentId(alphaComment.comment)).toBe( postComments.comments[1].comment.id, ); @@ -352,7 +396,10 @@ test("Reply to a comment from another instance, get notification", async () => { assertCommentFederation(alphaComment, replyRes.comment_view); // Did alpha get notified of the reply from beta? - let alphaUnreadCountRes = await getUnreadCount(alpha); + let alphaUnreadCountRes = await waitUntil( + () => getUnreadCount(alpha), + e => e.replies >= 1, + ); expect(alphaUnreadCountRes.replies).toBe(1); // check inbox of replies on alpha, fetching read/unread both @@ -394,7 +441,10 @@ test("Mention beta from alpha", async () => { expect(betaPost.post.name).toBe(postOnAlphaRes.post_view.post.name); // Make sure that both new comments are seen on beta and have parent/child relationship - let betaPostComments = await getComments(beta, betaPost.post.id); + let betaPostComments = await waitUntil( + () => getComments(beta, betaPost!.post.id), + c => c.comments[1].counts.score === 1, + ); expect(betaPostComments.comments.length).toBeGreaterThanOrEqual(2); // the trunk-branch root comment will be older than the mention reply comment, so index 1 let betaRootComment = betaPostComments.comments[1]; @@ -462,9 +512,9 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t expect(commentRes.comment_view.counts.score).toBe(1); // Make sure alpha sees it - let alphaPostComments2 = await getComments( - alpha, - alphaPost.post_view.post.id, + let alphaPostComments2 = await waitUntil( + () => getComments(alpha, alphaPost.post_view.post.id), + e => !!e.comments[0], ); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].community.local).toBe(true); @@ -476,10 +526,19 @@ test("A and G subscribe to B (center) A posts, G mentions B, it gets announced t ); // Make sure beta has mentions - let mentionsRes = await getMentions(beta); - expect(mentionsRes.mentions[0].comment.content).toBe(commentContent); - expect(mentionsRes.mentions[0].community.local).toBe(false); - expect(mentionsRes.mentions[0].creator.local).toBe(false); + let relevantMention = await waitUntil( + () => + getMentions(beta).then(m => + m.mentions.find( + m => m.comment.ap_id === commentRes.comment_view.comment.ap_id, + ), + ), + e => !!e, + ); + if (!relevantMention) throw Error("could not find mention"); + expect(relevantMention.comment.content).toBe(commentContent); + expect(relevantMention.community.local).toBe(false); + expect(relevantMention.creator.local).toBe(false); // TODO this is failing because fetchInReplyTos aren't getting score // expect(mentionsRes.mentions[0].score).toBe(1); }); @@ -493,6 +552,16 @@ test("Check that activity from another instance is sent to third instance", asyn let gammaFollow = await followBeta(gamma); expect(gammaFollow.community_view.community.local).toBe(false); expect(gammaFollow.community_view.community.name).toBe("main"); + await waitUntil( + () => resolveBetaCommunity(alpha), + c => c.community?.subscribed === "Subscribed", + ); + await waitUntil( + () => resolveBetaCommunity(gamma), + c => c.community?.subscribed === "Subscribed", + ); + // FOLLOW_ADDITIONS_RECHECK_DELAY + await delay(2000); // Create a post on beta let betaPost = await createPost(beta, 2); @@ -525,7 +594,10 @@ test("Check that activity from another instance is sent to third instance", asyn expect(commentRes.comment_view.counts.score).toBe(1); // Make sure alpha sees it - let alphaPostComments2 = await getComments(alpha, alphaPost.post.id); + let alphaPostComments2 = await waitUntil( + () => getComments(alpha, alphaPost!.post.id), + e => !!e.comments[0], + ); expect(alphaPostComments2.comments[0].comment.content).toBe(commentContent); expect(alphaPostComments2.comments[0].community.local).toBe(false); expect(alphaPostComments2.comments[0].creator.local).toBe(false); @@ -595,7 +667,12 @@ test("Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde } let alphaPost = await getPost(alpha, alphaPostB.post.id); - let alphaPostComments = await getComments(alpha, alphaPostB.post.id); + let alphaPostComments = await waitUntil( + () => getComments(alpha, alphaPostB!.post.id), + c => + c.comments[1]?.comment.content === + parentCommentRes.comment_view.comment.content, + ); expect(alphaPost.post_view.post.name).toBeDefined(); assertCommentFederation( alphaPostComments.comments[1], @@ -632,8 +709,12 @@ test("Report a comment", async () => { await reportComment(alpha, alphaComment.id, randomString(10)) ).comment_report_view.comment_report; - let betaReport = (await listCommentReports(beta)).comment_reports[0] - .comment_report; + let betaReport = ( + await waitUntil( + () => listCommentReports(beta), + e => !!e.comment_reports[0], + ) + ).comment_reports[0].comment_report; expect(betaReport).toBeDefined(); expect(betaReport.resolved).toBe(false); expect(betaReport.original_comment_text).toBe( diff --git a/api_tests/src/community.spec.ts b/api_tests/src/community.spec.ts index 6a6195989..a2af84440 100644 --- a/api_tests/src/community.spec.ts +++ b/api_tests/src/community.spec.ts @@ -24,6 +24,8 @@ import { getComments, createComment, getCommunityByName, + waitUntil, + delay, } from "./shared"; beforeAll(async () => { @@ -85,6 +87,12 @@ test("Delete community", async () => { // Make sure the follow response went through expect(follow.community_view.community.local).toBe(false); + await waitUntil( + () => resolveCommunity(alpha, searchShort), + g => g.community?.subscribed === "Subscribed", + ); + // wait FOLLOW_ADDITIONS_RECHECK_DELAY + await delay(2000); let deleteCommunityRes = await deleteCommunity( beta, true, @@ -96,9 +104,9 @@ test("Delete community", async () => { ); // Make sure it got deleted on A - let communityOnAlphaDeleted = await getCommunity( - alpha, - alphaCommunity.community.id, + let communityOnAlphaDeleted = await waitUntil( + () => getCommunity(alpha, alphaCommunity!.community.id), + g => g.community_view.community.deleted, ); expect(communityOnAlphaDeleted.community_view.community.deleted).toBe(true); @@ -111,9 +119,9 @@ test("Delete community", async () => { expect(undeleteCommunityRes.community_view.community.deleted).toBe(false); // Make sure it got undeleted on A - let communityOnAlphaUnDeleted = await getCommunity( - alpha, - alphaCommunity.community.id, + let communityOnAlphaUnDeleted = await waitUntil( + () => getCommunity(alpha, alphaCommunity!.community.id), + g => !g.community_view.community.deleted, ); expect(communityOnAlphaUnDeleted.community_view.community.deleted).toBe( false, @@ -137,6 +145,10 @@ test("Remove community", async () => { // Make sure the follow response went through expect(follow.community_view.community.local).toBe(false); + await waitUntil( + () => resolveCommunity(alpha, searchShort), + g => g.community?.subscribed === "Subscribed", + ); let removeCommunityRes = await removeCommunity( beta, true, @@ -148,9 +160,9 @@ test("Remove community", async () => { ); // Make sure it got Removed on A - let communityOnAlphaRemoved = await getCommunity( - alpha, - alphaCommunity.community.id, + let communityOnAlphaRemoved = await waitUntil( + () => getCommunity(alpha, alphaCommunity!.community.id), + g => g.community_view.community.removed, ); expect(communityOnAlphaRemoved.community_view.community.removed).toBe(true); @@ -163,9 +175,9 @@ test("Remove community", async () => { expect(unremoveCommunityRes.community_view.community.removed).toBe(false); // Make sure it got undeleted on A - let communityOnAlphaUnRemoved = await getCommunity( - alpha, - alphaCommunity.community.id, + let communityOnAlphaUnRemoved = await waitUntil( + () => getCommunity(alpha, alphaCommunity!.community.id), + g => !g.community_view.community.removed, ); expect(communityOnAlphaUnRemoved.community_view.community.removed).toBe( false, @@ -195,7 +207,10 @@ test("Admin actions in remote community are not federated to origin", async () = } await followCommunity(gamma, true, gammaCommunity.community.id); gammaCommunity = ( - await resolveCommunity(gamma, communityRes.community.actor_id) + await waitUntil( + () => resolveCommunity(gamma, communityRes.community.actor_id), + g => g.community?.subscribed === "Subscribed", + ) ).community; if (!gammaCommunity) { throw "Missing gamma community"; diff --git a/api_tests/src/follow.spec.ts b/api_tests/src/follow.spec.ts index 72a3a2ce1..a0b43989b 100644 --- a/api_tests/src/follow.spec.ts +++ b/api_tests/src/follow.spec.ts @@ -7,6 +7,7 @@ import { followCommunity, unfollowRemotes, getSite, + waitUntil, } from "./shared"; beforeAll(async () => { @@ -23,7 +24,12 @@ test("Follow federated community", async () => { throw "Missing beta community"; } await followCommunity(alpha, true, betaCommunity.community.id); - betaCommunity = (await resolveBetaCommunity(alpha)).community; + betaCommunity = ( + await waitUntil( + () => resolveBetaCommunity(alpha), + c => c.community?.subscribed === "Subscribed", + ) + ).community; // Make sure the follow response went through expect(betaCommunity?.community.local).toBe(false); diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 52fe72d3d..cd3eec71b 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -34,6 +34,8 @@ import { getSite, unfollows, resolveCommunity, + waitUntil, + delay, } from "./shared"; import { PostView } from "lemmy-js-client/dist/types/PostView"; import { CreatePost } from "lemmy-js-client/dist/types/CreatePost"; @@ -80,7 +82,11 @@ test("Create a post", async () => { expect(postRes.post_view.counts.score).toBe(1); // Make sure that post is liked on beta - let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; + const res = await waitUntil( + () => resolvePost(beta, postRes.post_view.post), + res => res.post?.counts.score === 1, + ); + let betaPost = res.post; expect(betaPost).toBeDefined(); expect(betaPost?.community.local).toBe(true); @@ -116,7 +122,12 @@ test("Unlike a post", async () => { expect(unlike2.post_view.counts.score).toBe(0); // Make sure that post is unliked on beta - let betaPost = (await resolvePost(beta, postRes.post_view.post)).post; + const betaPost = ( + await waitUntil( + () => resolvePost(beta, postRes.post_view.post), + b => b.post?.counts.score === 0, + ) + ).post; expect(betaPost).toBeDefined(); expect(betaPost?.community.local).toBe(true); expect(betaPost?.creator.local).toBe(false); @@ -129,9 +140,17 @@ test("Update a post", async () => { throw "Missing beta community"; } let postRes = await createPost(alpha, betaCommunity.community.id); + await waitUntil( + () => resolvePost(beta, postRes.post_view.post), + res => !!res.post, + ); let updatedName = "A jest test federated post, updated"; let updatedPost = await editPost(alpha, postRes.post_view.post); + await waitUntil( + () => resolvePost(beta, postRes.post_view.post), + res => res.post?.post.name === updatedName, + ); expect(updatedPost.post_view.post.name).toBe(updatedName); expect(updatedPost.post_view.community.local).toBe(false); expect(updatedPost.post_view.creator.local).toBe(true); @@ -197,8 +216,19 @@ test("Lock a post", async () => { throw "Missing beta community"; } await followCommunity(alpha, true, betaCommunity.community.id); - let postRes = await createPost(alpha, betaCommunity.community.id); + await waitUntil( + () => resolveBetaCommunity(alpha), + c => c.community?.subscribed === "Subscribed", + ); + // wait FOLLOW_ADDITIONS_RECHECK_DELAY (there's no API to wait for this currently) + await delay(2_000); + let postRes = await createPost(alpha, betaCommunity.community.id); + // wait for federation + await waitUntil( + () => searchPostLocal(beta, postRes.post_view.post), + res => !!res.posts[0], + ); // Lock the post let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post; if (!betaPost1) { @@ -208,7 +238,10 @@ test("Lock a post", async () => { expect(lockedPostRes.post_view.post.locked).toBe(true); // Make sure that post is locked on alpha - let searchAlpha = await searchPostLocal(alpha, postRes.post_view.post); + let searchAlpha = await waitUntil( + () => searchPostLocal(alpha, postRes.post_view.post), + res => res.posts[0]?.post.locked, + ); let alphaPost1 = searchAlpha.posts[0]; expect(alphaPost1.post.locked).toBe(true); @@ -220,7 +253,10 @@ test("Lock a post", async () => { expect(unlockedPost.post_view.post.locked).toBe(false); // Make sure that post is unlocked on alpha - let searchAlpha2 = await searchPostLocal(alpha, postRes.post_view.post); + let searchAlpha2 = await waitUntil( + () => searchPostLocal(alpha, postRes.post_view.post), + res => !res.posts[0]?.post.locked, + ); let alphaPost2 = searchAlpha2.posts[0]; expect(alphaPost2.community.local).toBe(false); expect(alphaPost2.creator.local).toBe(true); @@ -312,9 +348,11 @@ test("Remove a post from admin and community on same instance", async () => { await followBeta(alpha); let postRes = await createPost(alpha, betaCommunity.community.id); expect(postRes.post_view.post).toBeDefined(); - // Get the id for beta - let searchBeta = await searchPostLocal(beta, postRes.post_view.post); + let searchBeta = await waitUntil( + () => searchPostLocal(beta, postRes.post_view.post), + res => !!res.posts[0], + ); let betaPost = searchBeta.posts[0]; expect(betaPost).toBeDefined(); @@ -361,7 +399,7 @@ test("Enforce site ban for federated user", async () => { client: alpha.client, auth: alphaUserJwt.jwt ?? "", }; - let alphaUserActorId = (await getSite(alpha_user)).my_user?.local_user_view + const alphaUserActorId = (await getSite(alpha_user)).my_user?.local_user_view .person.actor_id; if (!alphaUserActorId) { throw "Missing alpha user actor id"; @@ -375,7 +413,10 @@ test("Enforce site ban for federated user", async () => { // alpha makes post in beta community, it federates to beta instance let postRes1 = await createPost(alpha_user, betaCommunity.community.id); - let searchBeta1 = await searchPostLocal(beta, postRes1.post_view.post); + let searchBeta1 = await waitUntil( + () => searchPostLocal(beta, postRes1.post_view.post), + res => !!res.posts[0], + ); expect(searchBeta1.posts[0]).toBeDefined(); // ban alpha from its instance @@ -388,7 +429,10 @@ test("Enforce site ban for federated user", async () => { expect(banAlpha.banned).toBe(true); // alpha ban should be federated to beta - let alphaUserOnBeta1 = await resolvePerson(beta, alphaUserActorId); + let alphaUserOnBeta1 = await waitUntil( + () => resolvePerson(beta, alphaUserActorId), + res => res.person?.person.banned ?? false, + ); expect(alphaUserOnBeta1.person?.person.banned).toBe(true); // existing alpha post should be removed on beta @@ -406,7 +450,10 @@ test("Enforce site ban for federated user", async () => { // alpha makes new post in beta community, it federates let postRes2 = await createPost(alpha_user, betaCommunity.community.id); - let searchBeta3 = await searchPostLocal(beta, postRes2.post_view.post); + let searchBeta3 = await waitUntil( + () => searchPostLocal(beta, postRes2.post_view.post), + e => !!e.posts[0], + ); expect(searchBeta3.posts[0]).toBeDefined(); let alphaUserOnBeta2 = await resolvePerson(beta, alphaUserActorId); @@ -497,7 +544,12 @@ test("Report a post", async () => { await reportPost(alpha, alphaPost.post.id, randomString(10)) ).post_report_view.post_report; - let betaReport = (await listPostReports(beta)).post_reports[0].post_report; + let betaReport = ( + await waitUntil( + () => listPostReports(beta), + res => !!res.post_reports[0], + ) + ).post_reports[0].post_report; expect(betaReport).toBeDefined(); expect(betaReport.resolved).toBe(false); expect(betaReport.original_post_name).toBe(alphaReport.original_post_name); diff --git a/api_tests/src/private_message.spec.ts b/api_tests/src/private_message.spec.ts index a79fe2fb6..081bb8d8d 100644 --- a/api_tests/src/private_message.spec.ts +++ b/api_tests/src/private_message.spec.ts @@ -9,6 +9,7 @@ import { listPrivateMessages, deletePrivateMessage, unfollowRemotes, + waitUntil, } from "./shared"; let recipient_id: number; @@ -30,7 +31,10 @@ test("Create a private message", async () => { expect(pmRes.private_message_view.creator.local).toBe(true); expect(pmRes.private_message_view.recipient.local).toBe(false); - let betaPms = await listPrivateMessages(beta); + let betaPms = await waitUntil( + () => listPrivateMessages(beta), + e => !!e.private_messages[0], + ); expect(betaPms.private_messages[0].private_message.content).toBeDefined(); expect(betaPms.private_messages[0].private_message.local).toBe(false); expect(betaPms.private_messages[0].creator.local).toBe(false); @@ -49,7 +53,10 @@ test("Update a private message", async () => { updatedContent, ); - let betaPms = await listPrivateMessages(beta); + let betaPms = await waitUntil( + () => listPrivateMessages(beta), + p => p.private_messages[0].private_message.content === updatedContent, + ); expect(betaPms.private_messages[0].private_message.content).toBe( updatedContent, ); @@ -57,7 +64,15 @@ test("Update a private message", async () => { test("Delete a private message", async () => { let pmRes = await createPrivateMessage(alpha, recipient_id); - let betaPms1 = await listPrivateMessages(beta); + let betaPms1 = await waitUntil( + () => listPrivateMessages(beta), + m => + !!m.private_messages.find( + e => + e.private_message.ap_id === + pmRes.private_message_view.private_message.ap_id, + ), + ); let deletedPmRes = await deletePrivateMessage( alpha, true, @@ -68,7 +83,10 @@ test("Delete a private message", async () => { // The GetPrivateMessages filters out deleted, // even though they are in the actual database. // no reason to show them - let betaPms2 = await listPrivateMessages(beta); + let betaPms2 = await waitUntil( + () => listPrivateMessages(beta), + p => p.private_messages.length === betaPms1.private_messages.length - 1, + ); expect(betaPms2.private_messages.length).toBe( betaPms1.private_messages.length - 1, ); @@ -83,7 +101,10 @@ test("Delete a private message", async () => { false, ); - let betaPms3 = await listPrivateMessages(beta); + let betaPms3 = await waitUntil( + () => listPrivateMessages(beta), + p => p.private_messages.length === betaPms1.private_messages.length, + ); expect(betaPms3.private_messages.length).toBe( betaPms1.private_messages.length, ); diff --git a/api_tests/src/shared.ts b/api_tests/src/shared.ts index e465d1da2..d116d329f 100644 --- a/api_tests/src/shared.ts +++ b/api_tests/src/shared.ts @@ -201,6 +201,11 @@ export async function setupLogins() { try { await createCommunity(alpha, "main"); await createCommunity(beta, "main"); + // wait for > INSTANCES_RECHECK_DELAY to ensure federation is initialized + // otherwise the first few federated events may be missed + // (because last_successful_id is set to current id when federation to an instance is first started) + // only needed the first time so do in this try + await delay(6_000); } catch (_) { console.log("Communities already exist"); } @@ -212,7 +217,9 @@ export async function createPost( ): Promise { let name = randomString(5); let body = randomString(10); - let url = "https://google.com/"; + // switch from google.com to example.com for consistent title (embed_title and embed_description) + // google switches description when a google doodle appears + let url = "https://example.com/"; let form: CreatePost = { name, url, @@ -851,3 +858,20 @@ export function getCommentParentId(comment: Comment): number | undefined { return undefined; } } + +export async function waitUntil( + fetcher: () => Promise, + checker: (t: T) => boolean, + retries = 10, + delaySeconds = 2, +) { + let retry = 0; + while (retry++ < retries) { + const result = await fetcher(); + if (checker(result)) return result; + await delay(delaySeconds * 1000); + } + throw Error( + `Failed "${fetcher}": "${checker}" did not return true after ${retries} retries (delayed ${delaySeconds}s each)`, + ); +} diff --git a/api_tests/tsconfig.json b/api_tests/tsconfig.json index 9e2e0720e..c580114ed 100644 --- a/api_tests/tsconfig.json +++ b/api_tests/tsconfig.json @@ -6,7 +6,7 @@ "noImplicitAny": true, "lib": ["es2017", "es7", "es6", "dom"], "outDir": "./dist", - "target": "ES2015", + "target": "ES2020", "strictNullChecks": true, "moduleResolution": "Node" }, diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index 45109d5c5..84b2efb2d 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -17,22 +17,14 @@ use lemmy_db_schema::{ }, }; use lemmy_db_views::structs::PrivateMessageView; -use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; -use once_cell::sync::{Lazy, OnceCell}; -use tokio::{ - sync::{ - mpsc, - mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender}, - Mutex, - }, - task::JoinHandle, -}; +use lemmy_utils::error::LemmyResult; +use once_cell::sync::OnceCell; use url::Url; type MatchOutgoingActivitiesBoxed = Box fn(SendActivityData, &'a Data) -> BoxFuture<'a, LemmyResult<()>>>; -/// This static is necessary so that activities can be sent out synchronously for tests. +/// This static is necessary so that the api_common crates don't need to depend on lemmy_apub pub static MATCH_OUTGOING_ACTIVITIES: OnceCell = OnceCell::new(); #[derive(Debug)] @@ -62,51 +54,16 @@ pub enum SendActivityData { CreateReport(Url, Person, Community, String), } -// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with -// ctrl+c still works. -static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { - let (sender, receiver) = mpsc::unbounded_channel(); - let weak_sender = sender.downgrade(); - ActivityChannel { - weak_sender, - receiver: Mutex::new(receiver), - keepalive_sender: Mutex::new(Some(sender)), - } -}); - -pub struct ActivityChannel { - weak_sender: WeakUnboundedSender, - receiver: Mutex>, - keepalive_sender: Mutex>>, -} +pub struct ActivityChannel; impl ActivityChannel { - pub async fn retrieve_activity() -> Option { - let mut lock = ACTIVITY_CHANNEL.receiver.lock().await; - lock.recv().await - } - pub async fn submit_activity( data: SendActivityData, context: &Data, ) -> LemmyResult<()> { - if *SYNCHRONOUS_FEDERATION { - MATCH_OUTGOING_ACTIVITIES - .get() - .expect("retrieve function pointer")(data, context) - .await?; - } - // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, - // not sure which way is more efficient - else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { - sender.send(data)?; - } - Ok(()) - } - - pub async fn close(outgoing_activities_task: JoinHandle>) -> LemmyResult<()> { - ACTIVITY_CHANNEL.keepalive_sender.lock().await.take(); - outgoing_activities_task.await??; - Ok(()) + MATCH_OUTGOING_ACTIVITIES + .get() + .expect("retrieve function pointer")(data, context) + .await } } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index d0b0f368c..8cc6ffe62 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -37,7 +37,6 @@ use lemmy_utils::{ slurs::{check_slurs, check_slurs_opt}, validation::{check_url_scheme, clean_url_params, is_valid_body_field, is_valid_post_title}, }, - SYNCHRONOUS_FEDERATION, }; use tracing::Instrument; use url::Url; @@ -190,11 +189,7 @@ pub async fn create_post( Err(e) => Err(e).with_lemmy_type(LemmyErrorType::CouldntSendWebmention), } }; - if *SYNCHRONOUS_FEDERATION { - task.await?; - } else { - spawn_try_task(task); - } + spawn_try_task(task); }; build_post_response(&context, community_id, person_id, post_id).await diff --git a/crates/apub/src/activities/block/block_user.rs b/crates/apub/src/activities/block/block_user.rs index 07fe0b75b..a4fd7c9b8 100644 --- a/crates/apub/src/activities/block/block_user.rs +++ b/crates/apub/src/activities/block/block_user.rs @@ -10,7 +10,7 @@ use crate::{ }, activity_lists::AnnouncableActivities, insert_received_activity, - objects::{instance::remote_instance_inboxes, person::ApubPerson}, + objects::person::ApubPerson, protocol::activities::block::block_user::BlockUser, }; use activitypub_federation::{ @@ -27,6 +27,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{ CommunityFollower, CommunityFollowerForm, @@ -97,12 +98,12 @@ impl BlockUser { match target { SiteOrCommunity::Site(_) => { - let inboxes = remote_instance_inboxes(&mut context.pool()).await?; + let inboxes = ActivitySendTargets::to_all_instances(); send_lemmy_activity(context, block, mod_, inboxes, false).await } SiteOrCommunity::Community(c) => { let activity = AnnouncableActivities::BlockUser(block); - let inboxes = vec![user.shared_inbox_or_inbox()]; + let inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox()); send_activity_in_community(activity, mod_, c, inboxes, true, context).await } } diff --git a/crates/apub/src/activities/block/undo_block_user.rs b/crates/apub/src/activities/block/undo_block_user.rs index 889d6a286..0796f86e7 100644 --- a/crates/apub/src/activities/block/undo_block_user.rs +++ b/crates/apub/src/activities/block/undo_block_user.rs @@ -8,7 +8,7 @@ use crate::{ }, activity_lists::AnnouncableActivities, insert_received_activity, - objects::{instance::remote_instance_inboxes, person::ApubPerson}, + objects::person::ApubPerson, protocol::activities::block::{block_user::BlockUser, undo_block_user::UndoBlockUser}, }; use activitypub_federation::{ @@ -20,6 +20,7 @@ use activitypub_federation::{ use lemmy_api_common::{context::LemmyContext, utils::sanitize_html_federation_opt}; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{CommunityPersonBan, CommunityPersonBanForm}, moderator::{ModBan, ModBanForm, ModBanFromCommunity, ModBanFromCommunityForm}, person::{Person, PersonUpdateForm}, @@ -59,10 +60,10 @@ impl UndoBlockUser { audience, }; - let mut inboxes = vec![user.shared_inbox_or_inbox()]; + let mut inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox()); match target { SiteOrCommunity::Site(_) => { - inboxes.append(&mut remote_instance_inboxes(&mut context.pool()).await?); + inboxes.set_all_instances(); send_lemmy_activity(context, undo, mod_, inboxes, false).await } SiteOrCommunity::Community(c) => { diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index 9ead36188..5939c023a 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -21,6 +21,7 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::activity::ActivitySendTargets; use lemmy_utils::error::{LemmyError, LemmyErrorType}; use serde_json::Value; use url::Url; @@ -94,7 +95,7 @@ impl AnnounceActivity { context: &Data, ) -> Result<(), LemmyError> { let announce = AnnounceActivity::new(object.clone(), community, context)?; - let inboxes = community.get_follower_inboxes(context).await?; + let inboxes = ActivitySendTargets::to_local_community_followers(community.id); send_lemmy_activity(context, announce, community, inboxes.clone(), false).await?; // Pleroma and Mastodon can't handle activities like Announce/Create/Page. So for diff --git a/crates/apub/src/activities/community/collection_add.rs b/crates/apub/src/activities/community/collection_add.rs index 3d2daf4c7..49e84f593 100644 --- a/crates/apub/src/activities/community/collection_add.rs +++ b/crates/apub/src/activities/community/collection_add.rs @@ -28,6 +28,7 @@ use lemmy_db_schema::{ impls::community::CollectionType, newtypes::{CommunityId, PersonId}, source::{ + activity::ActivitySendTargets, community::{Community, CommunityModerator, CommunityModeratorForm}, moderator::{ModAddCommunity, ModAddCommunityForm}, person::Person, @@ -62,7 +63,7 @@ impl CollectionAdd { }; let activity = AnnouncableActivities::CollectionAdd(add); - let inboxes = vec![added_mod.shared_inbox_or_inbox()]; + let inboxes = ActivitySendTargets::to_inbox(added_mod.shared_inbox_or_inbox()); send_activity_in_community(activity, actor, community, inboxes, true, context).await } @@ -87,7 +88,15 @@ impl CollectionAdd { audience: Some(community.id().into()), }; let activity = AnnouncableActivities::CollectionAdd(add); - send_activity_in_community(activity, actor, community, vec![], true, context).await + send_activity_in_community( + activity, + actor, + community, + ActivitySendTargets::empty(), + true, + context, + ) + .await } } diff --git a/crates/apub/src/activities/community/collection_remove.rs b/crates/apub/src/activities/community/collection_remove.rs index 38ed95683..72d58c114 100644 --- a/crates/apub/src/activities/community/collection_remove.rs +++ b/crates/apub/src/activities/community/collection_remove.rs @@ -24,6 +24,7 @@ use lemmy_api_common::{ use lemmy_db_schema::{ impls::community::CollectionType, source::{ + activity::ActivitySendTargets, community::{Community, CommunityModerator, CommunityModeratorForm}, moderator::{ModAddCommunity, ModAddCommunityForm}, post::{Post, PostUpdateForm}, @@ -57,7 +58,7 @@ impl CollectionRemove { }; let activity = AnnouncableActivities::CollectionRemove(remove); - let inboxes = vec![removed_mod.shared_inbox_or_inbox()]; + let inboxes = ActivitySendTargets::to_inbox(removed_mod.shared_inbox_or_inbox()); send_activity_in_community(activity, actor, community, inboxes, true, context).await } @@ -82,7 +83,15 @@ impl CollectionRemove { audience: Some(community.id().into()), }; let activity = AnnouncableActivities::CollectionRemove(remove); - send_activity_in_community(activity, actor, community, vec![], true, context).await + send_activity_in_community( + activity, + actor, + community, + ActivitySendTargets::empty(), + true, + context, + ) + .await } } diff --git a/crates/apub/src/activities/community/lock_page.rs b/crates/apub/src/activities/community/lock_page.rs index 446b7824f..b6caef9dc 100644 --- a/crates/apub/src/activities/community/lock_page.rs +++ b/crates/apub/src/activities/community/lock_page.rs @@ -24,6 +24,7 @@ use activitypub_federation::{ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::Community, person::Person, post::{Post, PostUpdateForm}, @@ -147,6 +148,14 @@ pub(crate) async fn send_lock_post( }; AnnouncableActivities::UndoLockPost(undo) }; - send_activity_in_community(activity, &actor.into(), &community, vec![], true, &context).await?; + send_activity_in_community( + activity, + &actor.into(), + &community, + ActivitySendTargets::empty(), + true, + &context, + ) + .await?; Ok(()) } diff --git a/crates/apub/src/activities/community/mod.rs b/crates/apub/src/activities/community/mod.rs index 7a88b34b3..c654a5c79 100644 --- a/crates/apub/src/activities/community/mod.rs +++ b/crates/apub/src/activities/community/mod.rs @@ -6,9 +6,8 @@ use crate::{ }; use activitypub_federation::{config::Data, traits::Actor}; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::source::person::PersonFollower; +use lemmy_db_schema::source::{activity::ActivitySendTargets, person::PersonFollower}; use lemmy_utils::error::LemmyError; -use url::Url; pub mod announce; pub mod collection_add; @@ -34,7 +33,7 @@ pub(crate) async fn send_activity_in_community( activity: AnnouncableActivities, actor: &ApubPerson, community: &ApubCommunity, - extra_inboxes: Vec, + extra_inboxes: ActivitySendTargets, is_mod_action: bool, context: &Data, ) -> Result<(), LemmyError> { @@ -43,8 +42,8 @@ pub(crate) async fn send_activity_in_community( // send to user followers if !is_mod_action { - inboxes.extend( - &mut PersonFollower::list_followers(&mut context.pool(), actor.id) + inboxes.add_inboxes( + PersonFollower::list_followers(&mut context.pool(), actor.id) .await? .into_iter() .map(|p| ApubPerson(p).shared_inbox_or_inbox()), @@ -56,7 +55,7 @@ pub(crate) async fn send_activity_in_community( AnnounceActivity::send(activity.clone().try_into()?, community, context).await?; } else { // send to the community, which will then forward to followers - inboxes.push(community.shared_inbox_or_inbox()); + inboxes.add_inbox(community.shared_inbox_or_inbox()); } send_lemmy_activity(context, activity.clone(), actor, inboxes, false).await?; diff --git a/crates/apub/src/activities/community/report.rs b/crates/apub/src/activities/community/report.rs index 16f8c3bdf..d6c058448 100644 --- a/crates/apub/src/activities/community/report.rs +++ b/crates/apub/src/activities/community/report.rs @@ -14,6 +14,7 @@ use activitypub_federation::{ use lemmy_api_common::{context::LemmyContext, utils::sanitize_html_federation}; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, comment_report::{CommentReport, CommentReportForm}, community::Community, person::Person, @@ -49,8 +50,11 @@ impl Report { id: id.clone(), audience: Some(community.id().into()), }; - - let inbox = vec![community.shared_inbox_or_inbox()]; + let inbox = if community.local { + ActivitySendTargets::empty() + } else { + ActivitySendTargets::to_inbox(community.shared_inbox_or_inbox()) + }; send_lemmy_activity(&context, report, &actor, inbox, false).await } } diff --git a/crates/apub/src/activities/community/update.rs b/crates/apub/src/activities/community/update.rs index c3b2a2ae1..6a4fcbfdd 100644 --- a/crates/apub/src/activities/community/update.rs +++ b/crates/apub/src/activities/community/update.rs @@ -18,7 +18,7 @@ use activitypub_federation::{ }; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ - source::{community::Community, person::Person}, + source::{activity::ActivitySendTargets, community::Community, person::Person}, traits::Crud, }; use lemmy_utils::error::LemmyError; @@ -46,7 +46,15 @@ pub(crate) async fn send_update_community( }; let activity = AnnouncableActivities::UpdateCommunity(update); - send_activity_in_community(activity, &actor, &community, vec![], true, &context).await + send_activity_in_community( + activity, + &actor, + &community, + ActivitySendTargets::empty(), + true, + &context, + ) + .await } #[async_trait::async_trait] diff --git a/crates/apub/src/activities/create_or_update/comment.rs b/crates/apub/src/activities/create_or_update/comment.rs index d54ca309e..502bc5e0c 100644 --- a/crates/apub/src/activities/create_or_update/comment.rs +++ b/crates/apub/src/activities/create_or_update/comment.rs @@ -31,6 +31,7 @@ use lemmy_db_schema::{ aggregates::structs::CommentAggregates, newtypes::PersonId, source::{ + activity::ActivitySendTargets, comment::{Comment, CommentLike, CommentLikeForm}, community::Community, person::Person, @@ -88,10 +89,10 @@ impl CreateOrUpdateNote { .map(|t| t.href.clone()) .map(ObjectId::from) .collect(); - let mut inboxes = vec![]; + let mut inboxes = ActivitySendTargets::empty(); for t in tagged_users { let person = t.dereference(&context).await?; - inboxes.push(person.shared_inbox_or_inbox()); + inboxes.add_inbox(person.shared_inbox_or_inbox()); } let activity = AnnouncableActivities::CreateOrUpdateComment(create_or_update); diff --git a/crates/apub/src/activities/create_or_update/post.rs b/crates/apub/src/activities/create_or_update/post.rs index 99e7a3d45..018f84da4 100644 --- a/crates/apub/src/activities/create_or_update/post.rs +++ b/crates/apub/src/activities/create_or_update/post.rs @@ -26,6 +26,7 @@ use lemmy_db_schema::{ aggregates::structs::PostAggregates, newtypes::PersonId, source::{ + activity::ActivitySendTargets, community::Community, person::Person, post::{Post, PostLike, PostLikeForm}, @@ -80,7 +81,7 @@ impl CreateOrUpdatePage { activity, &person, &community, - vec![], + ActivitySendTargets::empty(), is_mod_action, &context, ) diff --git a/crates/apub/src/activities/create_or_update/private_message.rs b/crates/apub/src/activities/create_or_update/private_message.rs index 77a430c3c..74f833051 100644 --- a/crates/apub/src/activities/create_or_update/private_message.rs +++ b/crates/apub/src/activities/create_or_update/private_message.rs @@ -13,6 +13,7 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor, Object}, }; use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::activity::ActivitySendTargets; use lemmy_db_views::structs::PrivateMessageView; use lemmy_utils::error::LemmyError; use url::Url; @@ -38,7 +39,7 @@ pub(crate) async fn send_create_or_update_pm( .await?, kind, }; - let inbox = vec![recipient.shared_inbox_or_inbox()]; + let inbox = ActivitySendTargets::to_inbox(recipient.shared_inbox_or_inbox()); send_lemmy_activity(&context, create_or_update, &actor, inbox, true).await } diff --git a/crates/apub/src/activities/deletion/delete_user.rs b/crates/apub/src/activities/deletion/delete_user.rs index 66a402161..7a56bda90 100644 --- a/crates/apub/src/activities/deletion/delete_user.rs +++ b/crates/apub/src/activities/deletion/delete_user.rs @@ -1,7 +1,7 @@ use crate::{ activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person}, insert_received_activity, - objects::{instance::remote_instance_inboxes, person::ApubPerson}, + objects::person::ApubPerson, protocol::activities::deletion::delete_user::DeleteUser, }; use activitypub_federation::{ @@ -11,7 +11,7 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use lemmy_api_common::{context::LemmyContext, utils::purge_user_account}; -use lemmy_db_schema::source::person::Person; +use lemmy_db_schema::source::{activity::ActivitySendTargets, person::Person}; use lemmy_utils::error::LemmyError; use url::Url; @@ -36,7 +36,8 @@ pub async fn delete_user( remove_data: Some(delete_content), }; - let inboxes = remote_instance_inboxes(&mut context.pool()).await?; + let inboxes = ActivitySendTargets::to_all_instances(); + send_lemmy_activity(&context, delete, &actor, inboxes, true).await?; Ok(()) } diff --git a/crates/apub/src/activities/deletion/mod.rs b/crates/apub/src/activities/deletion/mod.rs index 23ecd3bd1..74112fb58 100644 --- a/crates/apub/src/activities/deletion/mod.rs +++ b/crates/apub/src/activities/deletion/mod.rs @@ -31,6 +31,7 @@ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ newtypes::CommunityId, source::{ + activity::ActivitySendTargets, comment::{Comment, CommentUpdateForm}, community::{Community, CommunityUpdateForm}, person::Person, @@ -71,7 +72,7 @@ pub(crate) async fn send_apub_delete_in_community( activity, &actor, &community.into(), - vec![], + ActivitySendTargets::empty(), is_mod_action, context, ) @@ -103,7 +104,7 @@ pub(crate) async fn send_apub_delete_in_community_new( activity, &actor, &community.into(), - vec![], + ActivitySendTargets::empty(), is_mod_action, &context, ) @@ -123,9 +124,9 @@ pub(crate) async fn send_apub_delete_private_message( .into(); let deletable = DeletableObjects::PrivateMessage(pm.into()); - let inbox = vec![recipient.shared_inbox_or_inbox()]; + let inbox = ActivitySendTargets::to_inbox(recipient.shared_inbox_or_inbox()); if deleted { - let delete = Delete::new(actor, deletable, recipient.id(), None, None, &context)?; + let delete: Delete = Delete::new(actor, deletable, recipient.id(), None, None, &context)?; send_lemmy_activity(&context, delete, actor, inbox, true).await?; } else { let undo = UndoDelete::new(actor, deletable, recipient.id(), None, None, &context)?; diff --git a/crates/apub/src/activities/following/accept.rs b/crates/apub/src/activities/following/accept.rs index adaad51d1..381b05930 100644 --- a/crates/apub/src/activities/following/accept.rs +++ b/crates/apub/src/activities/following/accept.rs @@ -10,7 +10,10 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::{source::community::CommunityFollower, traits::Followable}; +use lemmy_db_schema::{ + source::{activity::ActivitySendTargets, community::CommunityFollower}, + traits::Followable, +}; use lemmy_utils::error::LemmyError; use url::Url; @@ -29,7 +32,7 @@ impl AcceptFollow { &context.settings().get_protocol_and_hostname(), )?, }; - let inbox = vec![person.shared_inbox_or_inbox()]; + let inbox = ActivitySendTargets::to_inbox(person.shared_inbox_or_inbox()); send_lemmy_activity(context, accept, &user_or_community, inbox, true).await } } diff --git a/crates/apub/src/activities/following/follow.rs b/crates/apub/src/activities/following/follow.rs index d64041b94..6f6e2718f 100644 --- a/crates/apub/src/activities/following/follow.rs +++ b/crates/apub/src/activities/following/follow.rs @@ -19,6 +19,7 @@ use activitypub_federation::{ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{CommunityFollower, CommunityFollowerForm}, person::{PersonFollower, PersonFollowerForm}, }, @@ -61,7 +62,11 @@ impl Follow { .ok(); let follow = Follow::new(actor, community, context)?; - let inbox = vec![community.shared_inbox_or_inbox()]; + let inbox = if community.local { + ActivitySendTargets::empty() + } else { + ActivitySendTargets::to_inbox(community.shared_inbox_or_inbox()) + }; send_lemmy_activity(context, follow, actor, inbox, true).await } } diff --git a/crates/apub/src/activities/following/undo_follow.rs b/crates/apub/src/activities/following/undo_follow.rs index c36b36df8..2f1c5a76b 100644 --- a/crates/apub/src/activities/following/undo_follow.rs +++ b/crates/apub/src/activities/following/undo_follow.rs @@ -14,6 +14,7 @@ use activitypub_federation::{ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ source::{ + activity::ActivitySendTargets, community::{CommunityFollower, CommunityFollowerForm}, person::{PersonFollower, PersonFollowerForm}, }, @@ -40,7 +41,11 @@ impl UndoFollow { &context.settings().get_protocol_and_hostname(), )?, }; - let inbox = vec![community.shared_inbox_or_inbox()]; + let inbox = if community.local { + ActivitySendTargets::empty() + } else { + ActivitySendTargets::to_inbox(community.shared_inbox_or_inbox()) + }; send_lemmy_activity(context, undo, actor, inbox, true).await } } diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index cddb5906b..29ec5bd30 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -26,7 +26,6 @@ use crate::{ CONTEXT, }; use activitypub_federation::{ - activity_queue::send_activity, config::Data, fetch::object_id::ObjectId, kinds::public, @@ -34,28 +33,21 @@ use activitypub_federation::{ traits::{ActivityHandler, Actor}, }; use anyhow::anyhow; -use lemmy_api_common::{ - context::LemmyContext, - send_activity::{ActivityChannel, SendActivityData}, -}; +use lemmy_api_common::{context::LemmyContext, send_activity::SendActivityData}; use lemmy_db_schema::{ newtypes::CommunityId, source::{ - activity::{SentActivity, SentActivityForm}, + activity::{ActivitySendTargets, ActorType, SentActivity, SentActivityForm}, community::Community, - instance::Instance, }, }; use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView}; use lemmy_utils::{ error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult}, spawn_try_task, - SYNCHRONOUS_FEDERATION, }; -use moka::future::Cache; -use once_cell::sync::Lazy; use serde::Serialize; -use std::{ops::Deref, sync::Arc, time::Duration}; +use std::{ops::Deref, time::Duration}; use tracing::info; use url::{ParseError, Url}; use uuid::Uuid; @@ -189,35 +181,23 @@ where Url::parse(&id) } +pub(crate) trait GetActorType { + fn actor_type(&self) -> ActorType; +} + #[tracing::instrument(skip_all)] async fn send_lemmy_activity( data: &Data, activity: Activity, actor: &ActorT, - mut inbox: Vec, + send_targets: ActivitySendTargets, sensitive: bool, ) -> Result<(), LemmyError> where Activity: ActivityHandler + Serialize + Send + Sync + Clone, - ActorT: Actor, + ActorT: Actor + GetActorType, Activity: ActivityHandler, { - static CACHE: Lazy>>> = Lazy::new(|| { - Cache::builder() - .max_capacity(1) - .time_to_live(DEAD_INSTANCE_LIST_CACHE_DURATION) - .build() - }); - let dead_instances = CACHE - .try_get_with((), async { - Ok::<_, diesel::result::Error>(Arc::new(Instance::dead_instances(&mut data.pool()).await?)) - }) - .await?; - - inbox.retain(|i| { - let domain = i.domain().expect("has domain").to_string(); - !dead_instances.contains(&domain) - }); info!("Sending activity {}", activity.id().to_string()); let activity = WithContext::new(activity, CONTEXT.deref().clone()); @@ -225,20 +205,21 @@ where ap_id: activity.id().clone().into(), data: serde_json::to_value(activity.clone())?, sensitive, + send_inboxes: send_targets + .inboxes + .into_iter() + .map(|e| Some(e.into())) + .collect(), + send_all_instances: send_targets.all_instances, + send_community_followers_of: send_targets.community_followers_of.map(|e| e.0), + actor_type: actor.actor_type(), + actor_apub_id: actor.id().into(), }; SentActivity::create(&mut data.pool(), form).await?; - send_activity(activity, actor, inbox, data).await?; Ok(()) } -pub async fn handle_outgoing_activities(context: Data) -> LemmyResult<()> { - while let Some(data) = ActivityChannel::retrieve_activity().await { - match_outgoing_activities(data, &context.reset_request_count()).await? - } - Ok(()) -} - pub async fn match_outgoing_activities( data: SendActivityData, context: &Data, @@ -343,10 +324,6 @@ pub async fn match_outgoing_activities( } } }; - if *SYNCHRONOUS_FEDERATION { - fed_task.await?; - } else { - spawn_try_task(fed_task); - } + spawn_try_task(fed_task); Ok(()) } diff --git a/crates/apub/src/activities/voting/mod.rs b/crates/apub/src/activities/voting/mod.rs index 742a4407c..c60235c05 100644 --- a/crates/apub/src/activities/voting/mod.rs +++ b/crates/apub/src/activities/voting/mod.rs @@ -13,6 +13,7 @@ use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ newtypes::DbUrl, source::{ + activity::ActivitySendTargets, comment::{CommentLike, CommentLikeForm}, community::Community, person::Person, @@ -36,17 +37,18 @@ pub(crate) async fn send_like_activity( let actor: ApubPerson = actor.into(); let community: ApubCommunity = community.into(); + let empty = ActivitySendTargets::empty(); // score of 1 means upvote, -1 downvote, 0 undo a previous vote if score != 0 { let vote = Vote::new(object_id, &actor, &community, score.try_into()?, &context)?; let activity = AnnouncableActivities::Vote(vote); - send_activity_in_community(activity, &actor, &community, vec![], false, &context).await + send_activity_in_community(activity, &actor, &community, empty, false, &context).await } else { // Lemmy API doesnt distinguish between Undo/Like and Undo/Dislike, so we hardcode it here. let vote = Vote::new(object_id, &actor, &community, VoteType::Like, &context)?; let undo_vote = UndoVote::new(vote, &actor, &community, &context)?; let activity = AnnouncableActivities::UndoVote(undo_vote); - send_activity_in_community(activity, &actor, &community, vec![], false, &context).await + send_activity_in_community(activity, &actor, &community, empty, false, &context).await } } diff --git a/crates/apub/src/fetcher/mod.rs b/crates/apub/src/fetcher/mod.rs index f56f1ccad..4e30b4b16 100644 --- a/crates/apub/src/fetcher/mod.rs +++ b/crates/apub/src/fetcher/mod.rs @@ -12,6 +12,7 @@ use lemmy_utils::error::LemmyError; pub mod post_or_comment; pub mod search; +pub mod site_or_community_or_user; pub mod user_or_community; /// Resolve actor identifier like `!news@example.com` to user or community object. diff --git a/crates/apub/src/fetcher/site_or_community_or_user.rs b/crates/apub/src/fetcher/site_or_community_or_user.rs new file mode 100644 index 000000000..76ee566c9 --- /dev/null +++ b/crates/apub/src/fetcher/site_or_community_or_user.rs @@ -0,0 +1,108 @@ +use crate::{ + fetcher::user_or_community::{PersonOrGroup, UserOrCommunity}, + objects::instance::ApubSite, + protocol::objects::instance::Instance, +}; +use activitypub_federation::{ + config::Data, + traits::{Actor, Object}, +}; +use chrono::{DateTime, Utc}; +use lemmy_api_common::context::LemmyContext; +use lemmy_utils::error::LemmyError; +use reqwest::Url; +use serde::{Deserialize, Serialize}; + +// todo: maybe this enum should be somewhere else? +#[derive(Debug)] +pub enum SiteOrCommunityOrUser { + Site(ApubSite), + UserOrCommunity(UserOrCommunity), +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(untagged)] +pub enum SiteOrPersonOrGroup { + Instance(Instance), + PersonOrGroup(PersonOrGroup), +} + +#[async_trait::async_trait] +impl Object for SiteOrCommunityOrUser { + type DataType = LemmyContext; + type Kind = SiteOrPersonOrGroup; + type Error = LemmyError; + + fn last_refreshed_at(&self) -> Option> { + Some(match self { + SiteOrCommunityOrUser::Site(p) => p.last_refreshed_at, + SiteOrCommunityOrUser::UserOrCommunity(p) => p.last_refreshed_at()?, + }) + } + + #[tracing::instrument(skip_all)] + async fn read_from_id( + _object_id: Url, + _data: &Data, + ) -> Result, LemmyError> { + unimplemented!(); + } + + #[tracing::instrument(skip_all)] + async fn delete(self, data: &Data) -> Result<(), LemmyError> { + match self { + SiteOrCommunityOrUser::Site(p) => p.delete(data).await, + SiteOrCommunityOrUser::UserOrCommunity(p) => p.delete(data).await, + } + } + + async fn into_json(self, _data: &Data) -> Result { + unimplemented!() + } + + #[tracing::instrument(skip_all)] + async fn verify( + apub: &Self::Kind, + expected_domain: &Url, + data: &Data, + ) -> Result<(), LemmyError> { + match apub { + SiteOrPersonOrGroup::Instance(a) => ApubSite::verify(a, expected_domain, data).await, + SiteOrPersonOrGroup::PersonOrGroup(a) => { + UserOrCommunity::verify(a, expected_domain, data).await + } + } + } + + #[tracing::instrument(skip_all)] + async fn from_json(_apub: Self::Kind, _data: &Data) -> Result { + unimplemented!(); + } +} + +impl Actor for SiteOrCommunityOrUser { + fn id(&self) -> Url { + match self { + SiteOrCommunityOrUser::Site(u) => u.id(), + SiteOrCommunityOrUser::UserOrCommunity(c) => c.id(), + } + } + + fn public_key_pem(&self) -> &str { + match self { + SiteOrCommunityOrUser::Site(p) => p.public_key_pem(), + SiteOrCommunityOrUser::UserOrCommunity(p) => p.public_key_pem(), + } + } + + fn private_key_pem(&self) -> Option { + match self { + SiteOrCommunityOrUser::Site(p) => p.private_key_pem(), + SiteOrCommunityOrUser::UserOrCommunity(p) => p.private_key_pem(), + } + } + + fn inbox(&self) -> Url { + unimplemented!() + } +} diff --git a/crates/apub/src/fetcher/user_or_community.rs b/crates/apub/src/fetcher/user_or_community.rs index 8ce188ca6..93e955c7b 100644 --- a/crates/apub/src/fetcher/user_or_community.rs +++ b/crates/apub/src/fetcher/user_or_community.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, objects::{community::ApubCommunity, person::ApubPerson}, protocol::objects::{group::Group, person::Person}, }; @@ -8,6 +9,7 @@ use activitypub_federation::{ }; use chrono::{DateTime, Utc}; use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::activity::ActorType; use lemmy_utils::error::LemmyError; use serde::{Deserialize, Serialize}; use url::Url; @@ -119,3 +121,12 @@ impl Actor for UserOrCommunity { unimplemented!() } } + +impl GetActorType for UserOrCommunity { + fn actor_type(&self) -> ActorType { + match self { + UserOrCommunity::User(p) => p.actor_type(), + UserOrCommunity::Community(p) => p.actor_type(), + } + } +} diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index 985210b36..1ae214c19 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -14,7 +14,7 @@ use std::{sync::Arc, time::Duration}; use url::Url; pub mod activities; -pub(crate) mod activity_lists; +pub mod activity_lists; pub mod api; pub(crate) mod collections; pub mod fetcher; diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 31055efd8..3e8c62746 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, check_apub_id_valid, local_site_data_cached, objects::instance::fetch_instance_actor_for_object, @@ -20,6 +21,7 @@ use lemmy_api_common::{ }; use lemmy_db_schema::{ source::{ + activity::ActorType, actor_language::CommunityLanguage, community::{Community, CommunityUpdateForm}, }, @@ -181,6 +183,12 @@ impl Actor for ApubCommunity { } } +impl GetActorType for ApubCommunity { + fn actor_type(&self) -> ActorType { + ActorType::Community + } +} + impl ApubCommunity { /// For a given community, returns the inboxes of all followers. #[tracing::instrument(skip_all)] diff --git a/crates/apub/src/objects/instance.rs b/crates/apub/src/objects/instance.rs index 72c21e518..5949e12bf 100644 --- a/crates/apub/src/objects/instance.rs +++ b/crates/apub/src/objects/instance.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, check_apub_id_valid_with_strictness, local_site_data_cached, objects::read_from_string_or_source_opt, @@ -23,12 +24,13 @@ use lemmy_api_common::{ use lemmy_db_schema::{ newtypes::InstanceId, source::{ + activity::ActorType, actor_language::SiteLanguage, instance::Instance as DbInstance, site::{Site, SiteInsertForm}, }, traits::Crud, - utils::{naive_now, DbPool}, + utils::naive_now, }; use lemmy_utils::{ error::LemmyError, @@ -175,6 +177,11 @@ impl Actor for ApubSite { self.inbox_url.clone().into() } } +impl GetActorType for ApubSite { + fn actor_type(&self) -> ActorType { + ActorType::Site + } +} /// Try to fetch the instance actor (to make things like instance rules available). pub(in crate::objects) async fn fetch_instance_actor_for_object + Clone>( @@ -201,16 +208,6 @@ pub(in crate::objects) async fn fetch_instance_actor_for_object + C } } -pub(crate) async fn remote_instance_inboxes(pool: &mut DbPool<'_>) -> Result, LemmyError> { - Ok( - Site::read_remote_sites(pool) - .await? - .into_iter() - .map(|s| ApubSite::from(s).shared_inbox_or_inbox()) - .collect(), - ) -} - #[cfg(test)] pub(crate) mod tests { #![allow(clippy::unwrap_used)] diff --git a/crates/apub/src/objects/person.rs b/crates/apub/src/objects/person.rs index c5468e432..34d824890 100644 --- a/crates/apub/src/objects/person.rs +++ b/crates/apub/src/objects/person.rs @@ -1,4 +1,5 @@ use crate::{ + activities::GetActorType, check_apub_id_valid_with_strictness, local_site_data_cached, objects::{instance::fetch_instance_actor_for_object, read_from_string_or_source_opt}, @@ -27,7 +28,10 @@ use lemmy_api_common::{ }, }; use lemmy_db_schema::{ - source::person::{Person as DbPerson, PersonInsertForm, PersonUpdateForm}, + source::{ + activity::ActorType, + person::{Person as DbPerson, PersonInsertForm, PersonUpdateForm}, + }, traits::{ApubActor, Crud}, utils::naive_now, }; @@ -205,6 +209,12 @@ impl Actor for ApubPerson { } } +impl GetActorType for ApubPerson { + fn actor_type(&self) -> ActorType { + ActorType::Person + } +} + #[cfg(test)] pub(crate) mod tests { #![allow(clippy::unwrap_used)] diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 8c4a0a15d..16b0fca40 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -30,6 +30,11 @@ impl SentActivity { .first::(conn) .await } + pub async fn read(pool: &mut DbPool<'_>, object_id: i64) -> Result { + use crate::schema::sent_activity::dsl::sent_activity; + let conn = &mut get_conn(pool).await?; + sent_activity.find(object_id).first::(conn).await + } } impl ReceivedActivity { @@ -62,7 +67,7 @@ mod tests { #![allow(clippy::indexing_slicing)] use super::*; - use crate::utils::build_db_pool_for_tests; + use crate::{source::activity::ActorType, utils::build_db_pool_for_tests}; use serde_json::json; use serial_test::serial; use url::Url; @@ -102,6 +107,13 @@ mod tests { ap_id: ap_id.clone(), data: data.clone(), sensitive, + actor_apub_id: Url::parse("http://example.com/u/exampleuser") + .unwrap() + .into(), + actor_type: ActorType::Person, + send_all_instances: false, + send_community_followers_of: None, + send_inboxes: vec![], }; SentActivity::create(pool, form).await.unwrap(); diff --git a/crates/db_schema/src/impls/instance.rs b/crates/db_schema/src/impls/instance.rs index acd4c1431..7e162717c 100644 --- a/crates/db_schema/src/impls/instance.rs +++ b/crates/db_schema/src/impls/instance.rs @@ -6,11 +6,13 @@ use crate::{ utils::{functions::lower, get_conn, naive_now, now, DbPool}, }; use diesel::{ - dsl::insert_into, + dsl::{count_star, insert_into}, result::Error, sql_types::{Nullable, Timestamptz}, ExpressionMethods, + NullableExpressionMethods, QueryDsl, + SelectableHelper, }; use diesel_async::RunQueryDsl; @@ -62,15 +64,6 @@ impl Instance { .await } - pub async fn dead_instances(pool: &mut DbPool<'_>) -> Result, Error> { - let conn = &mut get_conn(pool).await?; - instance::table - .select(instance::domain) - .filter(coalesce(instance::updated, instance::published).lt(now() - 3.days())) - .get_results(conn) - .await - } - #[cfg(test)] pub async fn delete_all(pool: &mut DbPool<'_>) -> Result { let conn = &mut get_conn(pool).await?; @@ -94,6 +87,44 @@ impl Instance { .await } + /// returns a list of all instances, each with a flag of whether the instance is allowed or not and dead or not + /// ordered by id + pub async fn read_all_with_blocked_and_dead( + pool: &mut DbPool<'_>, + ) -> Result, Error> { + let conn = &mut get_conn(pool).await?; + let is_dead_expr = coalesce(instance::updated, instance::published).lt(now() - 3.days()); + // this needs to be done in two steps because the meaning of the "blocked" column depends on the existence + // of any value at all in the allowlist. (so a normal join wouldn't work) + let use_allowlist = federation_allowlist::table + .select(count_star().gt(0)) + .get_result::(conn) + .await?; + if use_allowlist { + instance::table + .left_join(federation_allowlist::table) + .select(( + Self::as_select(), + federation_allowlist::id.nullable().is_not_null(), + is_dead_expr, + )) + .order_by(instance::id) + .get_results::<(Self, bool, bool)>(conn) + .await + } else { + instance::table + .left_join(federation_blocklist::table) + .select(( + Self::as_select(), + federation_blocklist::id.nullable().is_null(), + is_dead_expr, + )) + .order_by(instance::id) + .get_results::<(Self, bool, bool)>(conn) + .await + } + } + pub async fn linked(pool: &mut DbPool<'_>) -> Result, Error> { let conn = &mut get_conn(pool).await?; instance::table diff --git a/crates/db_schema/src/impls/site.rs b/crates/db_schema/src/impls/site.rs index 85e2b1f25..7e9329afb 100644 --- a/crates/db_schema/src/impls/site.rs +++ b/crates/db_schema/src/impls/site.rs @@ -1,6 +1,6 @@ use crate::{ - newtypes::{DbUrl, SiteId}, - schema::site::dsl::{actor_id, id, site}, + newtypes::{DbUrl, InstanceId, SiteId}, + schema::site::dsl::{actor_id, id, instance_id, site}, source::{ actor_language::SiteLanguage, site::{Site, SiteInsertForm, SiteUpdateForm}, @@ -8,7 +8,7 @@ use crate::{ traits::Crud, utils::{get_conn, DbPool}, }; -use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl}; +use diesel::{dsl::insert_into, result::Error, ExpressionMethods, OptionalExtension, QueryDsl}; use diesel_async::RunQueryDsl; use url::Url; @@ -61,19 +61,29 @@ impl Crud for Site { } impl Site { + pub async fn read_from_instance_id( + pool: &mut DbPool<'_>, + _instance_id: InstanceId, + ) -> Result, Error> { + let conn = &mut get_conn(pool).await?; + site + .filter(instance_id.eq(_instance_id)) + .get_result(conn) + .await + .optional() + } pub async fn read_from_apub_id( pool: &mut DbPool<'_>, object_id: &DbUrl, ) -> Result, Error> { let conn = &mut get_conn(pool).await?; - Ok( - site - .filter(actor_id.eq(object_id)) - .first::(conn) - .await - .ok() - .map(Into::into), - ) + + site + .filter(actor_id.eq(object_id)) + .first::(conn) + .await + .optional() + .map(Into::into) } pub async fn read_remote_sites(pool: &mut DbPool<'_>) -> Result, Error> { diff --git a/crates/db_schema/src/newtypes.rs b/crates/db_schema/src/newtypes.rs index af652d219..555b98256 100644 --- a/crates/db_schema/src/newtypes.rs +++ b/crates/db_schema/src/newtypes.rs @@ -168,7 +168,7 @@ pub struct CustomEmojiId(i32); pub struct LtreeDef(pub String); #[repr(transparent)] -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug, Hash)] #[cfg_attr(feature = "full", derive(AsExpression, FromSqlRow))] #[cfg_attr(feature = "full", diesel(sql_type = diesel::sql_types::Text))] pub struct DbUrl(pub(crate) Box); @@ -255,3 +255,9 @@ impl TS for DbUrl { true } } + +impl InstanceId { + pub fn inner(self) -> i32 { + self.0 + } +} diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index 1ffc14116..62b6ea80a 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -1,6 +1,10 @@ // @generated automatically by Diesel CLI. pub mod sql_types { + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "actor_type_enum"))] + pub struct ActorTypeEnum; + #[derive(diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "listing_type_enum"))] pub struct ListingTypeEnum; @@ -299,6 +303,16 @@ diesel::table! { } } +diesel::table! { + federation_queue_state (id) { + id -> Int4, + instance_id -> Int4, + last_successful_id -> Int8, + fail_count -> Int4, + last_retry -> Timestamptz, + } +} + diesel::table! { image_upload (id) { id -> Int4, @@ -804,12 +818,20 @@ diesel::table! { } diesel::table! { + use diesel::sql_types::*; + use super::sql_types::ActorTypeEnum; + sent_activity (id) { id -> Int8, ap_id -> Text, data -> Json, sensitive -> Bool, published -> Timestamptz, + send_inboxes -> Array>, + send_community_followers_of -> Nullable, + send_all_instances -> Bool, + actor_type -> ActorTypeEnum, + actor_apub_id -> Nullable, } } @@ -904,6 +926,7 @@ diesel::joinable!(custom_emoji_keyword -> custom_emoji (custom_emoji_id)); diesel::joinable!(email_verification -> local_user (local_user_id)); diesel::joinable!(federation_allowlist -> instance (instance_id)); diesel::joinable!(federation_blocklist -> instance (instance_id)); +diesel::joinable!(federation_queue_state -> instance (instance_id)); diesel::joinable!(image_upload -> local_user (local_user_id)); diesel::joinable!(local_site -> site (site_id)); diesel::joinable!(local_site_rate_limit -> local_site (local_site_id)); @@ -979,6 +1002,7 @@ diesel::allow_tables_to_appear_in_same_query!( email_verification, federation_allowlist, federation_blocklist, + federation_queue_state, image_upload, instance, language, diff --git a/crates/db_schema/src/source/activity.rs b/crates/db_schema/src/source/activity.rs index eafb75ad2..fc4bb0ec5 100644 --- a/crates/db_schema/src/source/activity.rs +++ b/crates/db_schema/src/source/activity.rs @@ -1,7 +1,55 @@ -use crate::{newtypes::DbUrl, schema::sent_activity}; +use crate::{ + newtypes::{CommunityId, DbUrl}, + schema::sent_activity, +}; use chrono::{DateTime, Utc}; +use diesel::{sql_types::Nullable, Queryable}; use serde_json::Value; -use std::fmt::Debug; +use std::{collections::HashSet, fmt::Debug}; +use url::Url; + +#[derive(FromSqlRow, PartialEq, Eq, Debug, Default, Clone)] +/// describes where an activity should be sent +pub struct ActivitySendTargets { + /// send to these inboxes explicitly + pub inboxes: HashSet, + /// send to all followers of these local communities + pub community_followers_of: Option, + /// send to all remote instances + pub all_instances: bool, +} + +// todo: in different file? +impl ActivitySendTargets { + pub fn empty() -> ActivitySendTargets { + ActivitySendTargets::default() + } + pub fn to_inbox(url: Url) -> ActivitySendTargets { + let mut a = ActivitySendTargets::empty(); + a.inboxes.insert(url); + a + } + pub fn to_local_community_followers(id: CommunityId) -> ActivitySendTargets { + let mut a = ActivitySendTargets::empty(); + a.community_followers_of = Some(id); + a + } + pub fn to_all_instances() -> ActivitySendTargets { + let mut a = ActivitySendTargets::empty(); + a.all_instances = true; + a + } + pub fn set_all_instances(&mut self) { + self.all_instances = true; + } + + pub fn add_inbox(&mut self, inbox: Url) { + self.inboxes.insert(inbox); + } + pub fn add_inboxes(&mut self, inboxes: impl Iterator) { + self.inboxes.extend(inboxes); + } +} #[derive(PartialEq, Eq, Debug, Queryable)] #[diesel(table_name = sent_activity)] @@ -11,13 +59,32 @@ pub struct SentActivity { pub data: Value, pub sensitive: bool, pub published: DateTime, + pub send_inboxes: Vec>, + pub send_community_followers_of: Option, + pub send_all_instances: bool, + pub actor_type: ActorType, + pub actor_apub_id: Option, } + #[derive(Insertable)] #[diesel(table_name = sent_activity)] pub struct SentActivityForm { pub ap_id: DbUrl, pub data: Value, pub sensitive: bool, + pub send_inboxes: Vec>, + pub send_community_followers_of: Option, + pub send_all_instances: bool, + pub actor_type: ActorType, + pub actor_apub_id: DbUrl, +} + +#[derive(Clone, Copy, Debug, diesel_derive_enum::DbEnum, PartialEq, Eq)] +#[ExistingTypePath = "crate::schema::sql_types::ActorTypeEnum"] +pub enum ActorType { + Site, + Community, + Person, } #[derive(PartialEq, Eq, Debug, Queryable)] diff --git a/crates/db_schema/src/source/instance.rs b/crates/db_schema/src/source/instance.rs index a887ae32f..8714b317e 100644 --- a/crates/db_schema/src/source/instance.rs +++ b/crates/db_schema/src/source/instance.rs @@ -11,7 +11,7 @@ use typed_builder::TypedBuilder; #[skip_serializing_none] #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] -#[cfg_attr(feature = "full", derive(Queryable, Identifiable, TS))] +#[cfg_attr(feature = "full", derive(Queryable, Selectable, Identifiable, TS))] #[cfg_attr(feature = "full", diesel(table_name = instance))] #[cfg_attr(feature = "full", ts(export))] /// A federated instance / site. diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index f90bb5c3d..7593cfd41 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -396,6 +396,9 @@ pub mod functions { } sql_function!(fn lower(x: Text) -> Text); + + // really this function is variadic, this just adds the two-argument version + sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: T) -> T); } pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*"; diff --git a/crates/db_views_actor/Cargo.toml b/crates/db_views_actor/Cargo.toml index 069013d71..20b69e56c 100644 --- a/crates/db_views_actor/Cargo.toml +++ b/crates/db_views_actor/Cargo.toml @@ -28,3 +28,4 @@ diesel-async = { workspace = true, features = [ serde = { workspace = true } serde_with = { workspace = true } ts-rs = { workspace = true, optional = true } +chrono.workspace = true diff --git a/crates/db_views_actor/src/community_follower_view.rs b/crates/db_views_actor/src/community_follower_view.rs index 7e8b24af3..b2eeda29e 100644 --- a/crates/db_views_actor/src/community_follower_view.rs +++ b/crates/db_views_actor/src/community_follower_view.rs @@ -1,21 +1,47 @@ use crate::structs::CommunityFollowerView; +use chrono::Utc; use diesel::{ dsl::{count_star, not}, result::Error, - sql_function, ExpressionMethods, QueryDsl, }; use diesel_async::RunQueryDsl; use lemmy_db_schema::{ - newtypes::{CommunityId, DbUrl, PersonId}, + newtypes::{CommunityId, DbUrl, InstanceId, PersonId}, schema::{community, community_follower, person}, - utils::{get_conn, DbPool}, + utils::{functions::coalesce, get_conn, DbPool}, }; -sql_function!(fn coalesce(x: diesel::sql_types::Nullable, y: diesel::sql_types::Text) -> diesel::sql_types::Text); - impl CommunityFollowerView { + /// return a list of local community ids and remote inboxes that at least one user of the given instance has followed + pub async fn get_instance_followed_community_inboxes( + pool: &mut DbPool<'_>, + instance_id: InstanceId, + published_since: chrono::DateTime, + ) -> Result, Error> { + let conn = &mut get_conn(pool).await?; + // In most cases this will fetch the same url many times (the shared inbox url) + // PG will only send a single copy to rust, but it has to scan through all follower rows (same as it was before). + // So on the PG side it would be possible to optimize this further by adding e.g. a new table community_followed_instances (community_id, instance_id) + // that would work for all instances that support fully shared inboxes. + // It would be a bit more complicated though to keep it in sync. + + community_follower::table + .inner_join(community::table) + .inner_join(person::table) + .filter(person::instance_id.eq(instance_id)) + .filter(community::local) // this should be a no-op since community_followers table only has local-person+remote-community or remote-person+local-community + .filter(not(person::local)) + .filter(community_follower::published.gt(published_since.naive_utc())) + .select(( + community::id, + coalesce(person::shared_inbox_url, person::inbox_url), + )) + .distinct() // only need each community_id, inbox combination once + .load::<(CommunityId, DbUrl)>(conn) + .await + } pub async fn get_community_follower_inboxes( pool: &mut DbPool<'_>, community_id: CommunityId, diff --git a/crates/federate/Cargo.toml b/crates/federate/Cargo.toml new file mode 100644 index 000000000..0c394d9f4 --- /dev/null +++ b/crates/federate/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "lemmy_federate" +version.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +lemmy_api_common.workspace = true +lemmy_apub.workspace = true +lemmy_db_schema = { workspace = true, features = ["full"] } +lemmy_db_views_actor.workspace = true +lemmy_utils.workspace = true + +activitypub_federation.workspace = true +anyhow.workspace = true +futures.workspace = true +chrono.workspace = true +diesel = { workspace = true, features = ["postgres", "chrono", "serde_json"] } +diesel-async = { workspace = true, features = ["deadpool", "postgres"] } +once_cell.workspace = true +reqwest.workspace = true +serde_json.workspace = true +serde.workspace = true +tokio = { workspace = true, features = ["full"] } +tracing.workspace = true + +async-trait = "0.1.71" +bytes = "1.4.0" +enum_delegate = "0.2.0" +moka = { version = "0.11.2", features = ["future"] } +openssl = "0.10.55" +reqwest-middleware = "0.2.2" +reqwest-tracing = "0.4.5" +tokio-util = "0.7.8" +tracing-subscriber = "0.3.17" diff --git a/crates/federate/src/federation_queue_state.rs b/crates/federate/src/federation_queue_state.rs new file mode 100644 index 000000000..8a3506121 --- /dev/null +++ b/crates/federate/src/federation_queue_state.rs @@ -0,0 +1,63 @@ +use crate::util::ActivityId; +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use lemmy_db_schema::{ + newtypes::InstanceId, + utils::{get_conn, DbPool}, +}; + +#[derive(Queryable, Selectable, Insertable, AsChangeset, Clone)] +#[diesel(table_name = lemmy_db_schema::schema::federation_queue_state)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct FederationQueueState { + pub instance_id: InstanceId, + pub last_successful_id: ActivityId, // todo: i64 + pub fail_count: i32, + pub last_retry: DateTime, +} + +impl FederationQueueState { + /// load state or return a default empty value + pub async fn load( + pool: &mut DbPool<'_>, + instance_id_: InstanceId, + ) -> Result { + use lemmy_db_schema::schema::federation_queue_state::dsl::{ + federation_queue_state, + instance_id, + }; + let conn = &mut get_conn(pool).await?; + Ok( + federation_queue_state + .filter(instance_id.eq(&instance_id_)) + .select(FederationQueueState::as_select()) + .get_result(conn) + .await + .optional()? + .unwrap_or(FederationQueueState { + instance_id: instance_id_, + fail_count: 0, + last_retry: Utc.timestamp_nanos(0), + last_successful_id: -1, // this value is set to the most current id for new instances + }), + ) + } + pub async fn upsert(pool: &mut DbPool<'_>, state: &FederationQueueState) -> Result<()> { + use lemmy_db_schema::schema::federation_queue_state::dsl::{ + federation_queue_state, + instance_id, + }; + let conn = &mut get_conn(pool).await?; + + state + .insert_into(federation_queue_state) + .on_conflict(instance_id) + .do_update() + .set(state) + .execute(conn) + .await?; + Ok(()) + } +} diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs new file mode 100644 index 000000000..dad7daad6 --- /dev/null +++ b/crates/federate/src/lib.rs @@ -0,0 +1,207 @@ +use crate::{ + util::{retry_sleep_duration, CancellableTask}, + worker::InstanceWorker, +}; +use activitypub_federation::config::FederationConfig; +use chrono::{Local, Timelike}; +use federation_queue_state::FederationQueueState; +use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::{ + newtypes::InstanceId, + source::instance::Instance, + utils::{ActualDbPool, DbPool}, +}; +use std::{collections::HashMap, time::Duration}; +use tokio::{ + sync::mpsc::{unbounded_channel, UnboundedReceiver}, + time::sleep, +}; +use tokio_util::sync::CancellationToken; + +mod federation_queue_state; +mod util; +mod worker; + +static WORKER_EXIT_TIMEOUT: Duration = Duration::from_secs(30); +#[cfg(debug_assertions)] +static INSTANCES_RECHECK_DELAY: Duration = Duration::from_secs(5); +#[cfg(not(debug_assertions))] +static INSTANCES_RECHECK_DELAY: Duration = Duration::from_secs(60); + +pub struct Opts { + /// how many processes you are starting in total + pub process_count: i32, + /// the index of this process (1-based: 1 - process_count) + pub process_index: i32, +} + +async fn start_stop_federation_workers( + opts: Opts, + pool: ActualDbPool, + federation_config: FederationConfig, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let mut workers = HashMap::>::new(); + + let (stats_sender, stats_receiver) = unbounded_channel(); + let exit_print = tokio::spawn(receive_print_stats(pool.clone(), stats_receiver)); + let pool2 = &mut DbPool::Pool(&pool); + let process_index = opts.process_index - 1; + let local_domain = federation_config.settings().get_hostname_without_port()?; + loop { + let mut total_count = 0; + let mut dead_count = 0; + let mut disallowed_count = 0; + for (instance, allowed, is_dead) in Instance::read_all_with_blocked_and_dead(pool2).await? { + if instance.domain == local_domain { + continue; + } + if instance.id.inner() % opts.process_count != process_index { + continue; + } + total_count += 1; + if !allowed { + disallowed_count += 1; + } + if is_dead { + dead_count += 1; + } + let should_federate = allowed && !is_dead; + if should_federate { + if workers.contains_key(&instance.id) { + if workers + .get(&instance.id) + .map(util::CancellableTask::has_ended) + .unwrap_or(false) + { + // task must have errored out, remove and recreated it + let worker = workers + .remove(&instance.id) + .expect("just checked contains_key"); + tracing::error!( + "worker for {} has stopped, recreating: {:?}", + instance.domain, + worker.cancel().await + ); + } else { + continue; + } + } + // create new worker + let stats_sender = stats_sender.clone(); + let context = federation_config.to_request_data(); + let pool = pool.clone(); + workers.insert( + instance.id, + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, |stop| async move { + InstanceWorker::init_and_loop( + instance, + context, + &mut DbPool::Pool(&pool), + stop, + stats_sender, + ) + .await?; + Ok(()) + }), + ); + } else if !should_federate { + if let Some(worker) = workers.remove(&instance.id) { + if let Err(e) = worker.cancel().await { + tracing::error!("error stopping worker: {e}"); + } + } + } + } + let worker_count = workers.len(); + tracing::info!("Federating to {worker_count}/{total_count} instances ({dead_count} dead, {disallowed_count} disallowed)"); + tokio::select! { + () = sleep(INSTANCES_RECHECK_DELAY) => {}, + _ = cancel.cancelled() => { break; } + } + } + drop(stats_sender); + tracing::warn!( + "Waiting for {} workers ({:.2?} max)", + workers.len(), + WORKER_EXIT_TIMEOUT + ); + // the cancel futures need to be awaited concurrently for the shutdown processes to be triggered concurrently + futures::future::join_all(workers.into_values().map(util::CancellableTask::cancel)).await; + exit_print.await?; + Ok(()) +} + +/// starts and stops federation workers depending on which instances are on db +/// await the returned future to stop/cancel all workers gracefully +pub fn start_stop_federation_workers_cancellable( + opts: Opts, + pool: ActualDbPool, + config: FederationConfig, +) -> CancellableTask<()> { + CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |c| { + start_stop_federation_workers(opts, pool, config, c) + }) +} + +/// every 60s, print the state for every instance. exits if the receiver is done (all senders dropped) +async fn receive_print_stats( + pool: ActualDbPool, + mut receiver: UnboundedReceiver<(String, FederationQueueState)>, +) { + let pool = &mut DbPool::Pool(&pool); + let mut printerval = tokio::time::interval(Duration::from_secs(60)); + printerval.tick().await; // skip first + let mut stats = HashMap::new(); + loop { + tokio::select! { + ele = receiver.recv() => { + let Some((domain, ele)) = ele else { + tracing::info!("done. quitting"); + print_stats(pool, &stats).await; + return; + }; + stats.insert(domain, ele); + }, + _ = printerval.tick() => { + print_stats(pool, &stats).await; + } + } + } +} + +async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { + let last_id = crate::util::get_latest_activity_id(pool).await; + let Ok(last_id) = last_id else { + tracing::error!("could not get last id"); + return; + }; + // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be considered up to date + tracing::info!( + "Federation state as of {}:", + Local::now() + .with_nanosecond(0) + .expect("0 is valid nanos") + .to_rfc3339() + ); + // todo: less noisy output (only output failing instances and summary for successful) + // todo: more stats (act/sec, avg http req duration) + let mut ok_count = 0; + for (domain, stat) in stats { + let behind = last_id - stat.last_successful_id; + if stat.fail_count > 0 { + tracing::info!( + "{}: Warning. {} behind, {} consecutive fails, current retry delay {:.2?}", + domain, + behind, + stat.fail_count, + retry_sleep_duration(stat.fail_count) + ); + } else if behind > 0 { + tracing::info!("{}: Ok. {} behind", domain, behind); + } else { + ok_count += 1; + } + } + tracing::info!("{ok_count} others up to date"); +} diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs new file mode 100644 index 000000000..4f260708d --- /dev/null +++ b/crates/federate/src/util.rs @@ -0,0 +1,198 @@ +use anyhow::{anyhow, Context, Result}; +use diesel::{ + prelude::*, + sql_types::{Bool, Int8}, +}; +use diesel_async::RunQueryDsl; +use lemmy_apub::{ + activity_lists::SharedInboxActivities, + fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity}, +}; +use lemmy_db_schema::{ + source::{ + activity::{ActorType, SentActivity}, + community::Community, + person::Person, + site::Site, + }, + traits::ApubActor, + utils::{get_conn, DbPool}, +}; +use moka::future::Cache; +use once_cell::sync::Lazy; +use reqwest::Url; +use serde_json::Value; +use std::{ + future::Future, + pin::Pin, + sync::{Arc, RwLock}, + time::Duration, +}; +use tokio::{task::JoinHandle, time::sleep}; +use tokio_util::sync::CancellationToken; + +pub struct CancellableTask { + f: Pin> + Send + 'static>>, + ended: Arc>, +} + +impl CancellableTask { + /// spawn a task but with graceful shutdown + pub fn spawn( + timeout: Duration, + task: impl FnOnce(CancellationToken) -> F, + ) -> CancellableTask + where + F: Future> + Send + 'static, + { + let stop = CancellationToken::new(); + let task = task(stop.clone()); + let ended = Arc::new(RwLock::new(false)); + let ended_write = ended.clone(); + let task: JoinHandle> = tokio::spawn(async move { + match task.await { + Ok(o) => Ok(o), + Err(e) => { + *ended_write.write().expect("poisoned") = true; + Err(e) + } + } + }); + let abort = task.abort_handle(); + CancellableTask { + ended, + f: Box::pin(async move { + stop.cancel(); + tokio::select! { + r = task => { + Ok(r.context("could not join")??) + }, + _ = sleep(timeout) => { + abort.abort(); + tracing::warn!("Graceful shutdown timed out, aborting task"); + Err(anyhow!("task aborted due to timeout")) + } + } + }), + } + } + + /// cancel the cancel signal, wait for timeout for the task to stop gracefully, otherwise abort it + pub async fn cancel(self) -> Result { + self.f.await + } + pub fn has_ended(&self) -> bool { + *self.ended.read().expect("poisoned") + } +} + +/// assuming apub priv key and ids are immutable, then we don't need to have TTL +/// TODO: capacity should be configurable maybe based on memory use +pub(crate) async fn get_actor_cached( + pool: &mut DbPool<'_>, + actor_type: ActorType, + actor_apub_id: &Url, +) -> Result> { + static CACHE: Lazy>> = + Lazy::new(|| Cache::builder().max_capacity(10000).build()); + CACHE + .try_get_with(actor_apub_id.clone(), async { + let url = actor_apub_id.clone().into(); + let person = match actor_type { + ActorType::Site => SiteOrCommunityOrUser::Site( + Site::read_from_apub_id(pool, &url) + .await? + .context("apub site not found")? + .into(), + ), + ActorType::Community => SiteOrCommunityOrUser::UserOrCommunity(UserOrCommunity::Community( + Community::read_from_apub_id(pool, &url) + .await? + .context("apub community not found")? + .into(), + )), + ActorType::Person => SiteOrCommunityOrUser::UserOrCommunity(UserOrCommunity::User( + Person::read_from_apub_id(pool, &url) + .await? + .context("apub person not found")? + .into(), + )), + }; + Result::<_, anyhow::Error>::Ok(Arc::new(person)) + }) + .await + .map_err(|e| anyhow::anyhow!("err getting actor {actor_type:?} {actor_apub_id}: {e:?}")) +} + +/// this should maybe be a newtype like all the other PersonId CommunityId etc. +pub(crate) type ActivityId = i64; + +type CachedActivityInfo = Option>; +/// activities are immutable so cache does not need to have TTL +/// May return None if the corresponding id does not exist or is a received activity. +/// Holes in serials are expected behaviour in postgresql +/// todo: cache size should probably be configurable / dependent on desired memory usage +pub(crate) async fn get_activity_cached( + pool: &mut DbPool<'_>, + activity_id: ActivityId, +) -> Result { + static ACTIVITIES: Lazy> = + Lazy::new(|| Cache::builder().max_capacity(10000).build()); + ACTIVITIES + .try_get_with(activity_id, async { + let row = SentActivity::read(pool, activity_id) + .await + .optional() + .context("could not read activity")?; + let Some(mut row) = row else { + return anyhow::Result::<_, anyhow::Error>::Ok(None); + }; + // swap to avoid cloning + let mut data = Value::Null; + std::mem::swap(&mut row.data, &mut data); + let activity_actual: SharedInboxActivities = serde_json::from_value(data)?; + + Ok(Some(Arc::new((row, activity_actual)))) + }) + .await + .map_err(|e| anyhow::anyhow!("err getting activity: {e:?}")) +} + +/// return the most current activity id (with 1 second cache) +pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result { + static CACHE: Lazy> = Lazy::new(|| { + Cache::builder() + .time_to_live(Duration::from_secs(1)) + .build() + }); + CACHE + .try_get_with((), async { + let conn = &mut get_conn(pool).await?; + let seq: Sequence = + diesel::sql_query("select last_value, is_called from sent_activity_id_seq") + .get_result(conn) + .await?; + let latest_id = if seq.is_called { + seq.last_value as ActivityId + } else { + // if a PG sequence has never been used, last_value will actually be next_value + (seq.last_value - 1) as ActivityId + }; + anyhow::Result::<_, anyhow::Error>::Ok(latest_id as ActivityId) + }) + .await + .map_err(|e| anyhow::anyhow!("err getting id: {e:?}")) +} + +/// how long to sleep based on how many retries have already happened +pub(crate) fn retry_sleep_duration(retry_count: i32) -> Duration { + Duration::from_secs_f64(10.0 * 2.0_f64.powf(f64::from(retry_count))) +} + +#[derive(QueryableByName)] +struct Sequence { + #[diesel(sql_type = Int8)] + last_value: i64, // this value is bigint for some reason even if sequence is int4 + #[diesel(sql_type = Bool)] + is_called: bool, +} diff --git a/crates/federate/src/worker.rs b/crates/federate/src/worker.rs new file mode 100644 index 000000000..a2bdf33c2 --- /dev/null +++ b/crates/federate/src/worker.rs @@ -0,0 +1,312 @@ +use crate::{ + federation_queue_state::FederationQueueState, + util::{get_activity_cached, get_actor_cached, get_latest_activity_id, retry_sleep_duration}, +}; +use activitypub_federation::{activity_sending::SendActivityTask, config::Data}; +use anyhow::{Context, Result}; +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_api_common::context::LemmyContext; +use lemmy_apub::activity_lists::SharedInboxActivities; +use lemmy_db_schema::{ + newtypes::{CommunityId, InstanceId}, + source::{activity::SentActivity, instance::Instance, site::Site}, + utils::DbPool, +}; +use lemmy_db_views_actor::structs::CommunityFollowerView; +use lemmy_utils::error::LemmyErrorExt2; +use once_cell::sync::Lazy; +use reqwest::Url; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; +use tokio::{sync::mpsc::UnboundedSender, time::sleep}; +use tokio_util::sync::CancellationToken; +/// save state to db every n sends if there's no failures (otherwise state is saved after every attempt) +static CHECK_SAVE_STATE_EVERY_IT: i64 = 100; +static SAVE_STATE_EVERY_TIME: Duration = Duration::from_secs(60); +/// recheck for new federation work every n seconds +#[cfg(debug_assertions)] +static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(1); +#[cfg(not(debug_assertions))] +static WORK_FINISHED_RECHECK_DELAY: Duration = Duration::from_secs(30); +#[cfg(debug_assertions)] +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::Duration::seconds(1)); +#[cfg(not(debug_assertions))] +static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::Duration::minutes(1)); +static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy = + Lazy::new(|| chrono::Duration::hours(1)); +pub(crate) struct InstanceWorker { + instance: Instance, + // load site lazily because if an instance is first seen due to being on allowlist, + // the corresponding row in `site` may not exist yet since that is only added once + // `fetch_instance_actor_for_object` is called. + // (this should be unlikely to be relevant outside of the federation tests) + site_loaded: bool, + site: Option, + followed_communities: HashMap>, + stop: CancellationToken, + context: Data, + stats_sender: UnboundedSender<(String, FederationQueueState)>, + last_full_communities_fetch: DateTime, + last_incremental_communities_fetch: DateTime, + state: FederationQueueState, + last_state_insert: DateTime, +} + +impl InstanceWorker { + pub(crate) async fn init_and_loop( + instance: Instance, + context: Data, + pool: &mut DbPool<'_>, // in theory there's a ref to the pool in context, but i couldn't get that to work wrt lifetimes + stop: CancellationToken, + stats_sender: UnboundedSender<(String, FederationQueueState)>, + ) -> Result<(), anyhow::Error> { + let state = FederationQueueState::load(pool, instance.id).await?; + let mut worker = InstanceWorker { + instance, + site_loaded: false, + site: None, + followed_communities: HashMap::new(), + stop, + context, + stats_sender, + last_full_communities_fetch: Utc.timestamp_nanos(0), + last_incremental_communities_fetch: Utc.timestamp_nanos(0), + state, + last_state_insert: Utc.timestamp_nanos(0), + }; + worker.loop_until_stopped(pool).await + } + /// loop fetch new activities from db and send them to the inboxes of the given instances + /// this worker only returns if (a) there is an internal error or (b) the cancellation token is cancelled (graceful exit) + pub(crate) async fn loop_until_stopped( + &mut self, + pool: &mut DbPool<'_>, + ) -> Result<(), anyhow::Error> { + let save_state_every = chrono::Duration::from_std(SAVE_STATE_EVERY_TIME).expect("not negative"); + + self.update_communities(pool).await?; + self.initial_fail_sleep().await?; + while !self.stop.is_cancelled() { + self.loop_batch(pool).await?; + if self.stop.is_cancelled() { + break; + } + if (Utc::now() - self.last_state_insert) > save_state_every { + self.save_and_send_state(pool).await?; + } + self.update_communities(pool).await?; + } + // final update of state in db + self.save_and_send_state(pool).await?; + Ok(()) + } + + async fn initial_fail_sleep(&mut self) -> Result<()> { + // before starting queue, sleep remaining duration if last request failed + if self.state.fail_count > 0 { + let elapsed = (Utc::now() - self.state.last_retry).to_std()?; + let required = retry_sleep_duration(self.state.fail_count); + if elapsed >= required { + return Ok(()); + } + let remaining = required - elapsed; + tokio::select! { + () = sleep(remaining) => {}, + () = self.stop.cancelled() => {} + } + } + Ok(()) + } + async fn loop_batch(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + let latest_id = get_latest_activity_id(pool).await?; + if self.state.last_successful_id == -1 { + // this is the initial creation (instance first seen) of the federation queue for this instance + // skip all past activities: + self.state.last_successful_id = latest_id; + // save here to ensure it's not read as 0 again later if no activities have happened + self.save_and_send_state(pool).await?; + } + let mut id = self.state.last_successful_id; + if id == latest_id { + // no more work to be done, wait before rechecking + tokio::select! { + () = sleep(WORK_FINISHED_RECHECK_DELAY) => {}, + () = self.stop.cancelled() => {} + } + return Ok(()); + } + let mut processed_activities = 0; + while id < latest_id + && processed_activities < CHECK_SAVE_STATE_EVERY_IT + && !self.stop.is_cancelled() + { + id += 1; + processed_activities += 1; + let Some(ele) = get_activity_cached(pool, id) + .await + .context("failed reading activity from db")? + else { + self.state.last_successful_id = id; + continue; + }; + if let Err(e) = self.send_retry_loop(pool, &ele.0, &ele.1).await { + tracing::warn!( + "sending {} errored internally, skipping activity: {:?}", + ele.0.ap_id, + e + ); + } + if self.stop.is_cancelled() { + return Ok(()); + } + // send success! + self.state.last_successful_id = id; + self.state.fail_count = 0; + } + Ok(()) + } + + // this function will return successfully when (a) send succeeded or (b) worker cancelled + // and will return an error if an internal error occurred (send errors cause an infinite loop) + async fn send_retry_loop( + &mut self, + pool: &mut DbPool<'_>, + activity: &SentActivity, + object: &SharedInboxActivities, + ) -> Result<()> { + let inbox_urls = self + .get_inbox_urls(pool, activity) + .await + .context("failed figuring out inbox urls")?; + if inbox_urls.is_empty() { + self.state.last_successful_id = activity.id; + return Ok(()); + } + let Some(actor_apub_id) = &activity.actor_apub_id else { + return Ok(()); // activity was inserted before persistent queue was activated + }; + let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id) + .await + .context("failed getting actor instance (was it marked deleted / removed?)")?; + + let inbox_urls = inbox_urls.into_iter().collect(); + let requests = SendActivityTask::prepare(object, actor.as_ref(), inbox_urls, &self.context) + .await + .into_anyhow()?; + for task in requests { + // usually only one due to shared inbox + tracing::info!("sending out {}", task); + while let Err(e) = task.sign_and_send(&self.context).await { + self.state.fail_count += 1; + self.state.last_retry = Utc::now(); + let retry_delay: Duration = retry_sleep_duration(self.state.fail_count); + tracing::info!( + "{}: retrying {} attempt {} with delay {retry_delay:.2?}. ({e})", + self.instance.domain, + activity.id, + self.state.fail_count + ); + self.save_and_send_state(pool).await?; + tokio::select! { + () = sleep(retry_delay) => {}, + () = self.stop.cancelled() => { + // save state to db and exit + return Ok(()); + } + } + } + } + Ok(()) + } + + /// get inbox urls of sending the given activity to the given instance + /// most often this will return 0 values (if instance doesn't care about the activity) + /// or 1 value (the shared inbox) + /// > 1 values only happens for non-lemmy software + async fn get_inbox_urls( + &mut self, + pool: &mut DbPool<'_>, + activity: &SentActivity, + ) -> Result> { + let mut inbox_urls: HashSet = HashSet::new(); + + if activity.send_all_instances { + if !self.site_loaded { + self.site = Site::read_from_instance_id(pool, self.instance.id).await?; + self.site_loaded = true; + } + if let Some(site) = &self.site { + // Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these activities. So handling it like this is fine. + inbox_urls.insert(site.inbox_url.inner().clone()); + } + } + if let Some(t) = &activity.send_community_followers_of { + if let Some(urls) = self.followed_communities.get(t) { + inbox_urls.extend(urls.iter().map(std::clone::Clone::clone)); + } + } + inbox_urls.extend( + activity + .send_inboxes + .iter() + .filter_map(std::option::Option::as_ref) + .filter_map(|u| (u.domain() == Some(&self.instance.domain)).then(|| u.inner().clone())), + ); + Ok(inbox_urls) + } + + async fn update_communities(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY { + // process removals every hour + (self.followed_communities, self.last_full_communities_fetch) = self + .get_communities(pool, self.instance.id, self.last_full_communities_fetch) + .await?; + self.last_incremental_communities_fetch = self.last_full_communities_fetch; + } + if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY { + // process additions every minute + let (news, time) = self + .get_communities( + pool, + self.instance.id, + self.last_incremental_communities_fetch, + ) + .await?; + self.followed_communities.extend(news); + self.last_incremental_communities_fetch = time; + } + Ok(()) + } + + /// get a list of local communities with the remote inboxes on the given instance that cares about them + async fn get_communities( + &mut self, + pool: &mut DbPool<'_>, + instance_id: InstanceId, + last_fetch: DateTime, + ) -> Result<(HashMap>, DateTime)> { + let new_last_fetch = Utc::now(); // update to time before fetch to ensure overlap + Ok(( + CommunityFollowerView::get_instance_followed_community_inboxes(pool, instance_id, last_fetch) + .await? + .into_iter() + .fold(HashMap::new(), |mut map, (c, u)| { + map.entry(c).or_insert_with(HashSet::new).insert(u.into()); + map + }), + new_last_fetch, + )) + } + async fn save_and_send_state(&mut self, pool: &mut DbPool<'_>) -> Result<()> { + self.last_state_insert = Utc::now(); + FederationQueueState::upsert(pool, &self.state).await?; + self + .stats_sender + .send((self.instance.domain.clone(), self.state.clone()))?; + Ok(()) + } +} diff --git a/crates/utils/src/error.rs b/crates/utils/src/error.rs index ac7e386f5..9326805fb 100644 --- a/crates/utils/src/error.rs +++ b/crates/utils/src/error.rs @@ -239,6 +239,7 @@ impl> LemmyErrorExt for Result { } pub trait LemmyErrorExt2 { fn with_lemmy_type(self, error_type: LemmyErrorType) -> Result; + fn into_anyhow(self) -> Result; } impl LemmyErrorExt2 for Result { @@ -248,6 +249,10 @@ impl LemmyErrorExt2 for Result { e }) } + // this function can't be an impl From or similar because it would conflict with one of the other broad Into<> implementations + fn into_anyhow(self) -> Result { + self.map_err(|e| e.inner) + } } #[cfg(test)] diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 1ef8a842c..c0553de31 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -18,7 +18,6 @@ pub mod version; use error::LemmyError; use futures::Future; -use once_cell::sync::Lazy; use std::time::Duration; use tracing::Instrument; @@ -38,16 +37,6 @@ macro_rules! location_info { }; } -/// if true, all federation should happen synchronously. useful for debugging and testing. -/// defaults to true on debug mode, false on releasemode -/// override to true by setting env LEMMY_SYNCHRONOUS_FEDERATION=1 -/// override to false by setting env LEMMY_SYNCHRONOUS_FEDERATION="" -pub static SYNCHRONOUS_FEDERATION: Lazy = Lazy::new(|| { - std::env::var("LEMMY_SYNCHRONOUS_FEDERATION") - .map(|s| !s.is_empty()) - .unwrap_or(cfg!(debug_assertions)) -}); - /// tokio::spawn, but accepts a future that may fail and also /// * logs errors /// * attaches the spawned task to the tracing span of the caller for better logging diff --git a/migrations/2023-08-01-115243_persistent-activity-queue/down.sql b/migrations/2023-08-01-115243_persistent-activity-queue/down.sql new file mode 100644 index 000000000..30800fe5b --- /dev/null +++ b/migrations/2023-08-01-115243_persistent-activity-queue/down.sql @@ -0,0 +1,13 @@ +ALTER TABLE sent_activity + DROP COLUMN send_inboxes, + DROP COLUMN send_community_followers_of, + DROP COLUMN send_all_instances, + DROP COLUMN actor_apub_id, + DROP COLUMN actor_type; + +DROP TYPE actor_type_enum; + +DROP TABLE federation_queue_state; + +DROP INDEX idx_community_follower_published; + diff --git a/migrations/2023-08-01-115243_persistent-activity-queue/up.sql b/migrations/2023-08-01-115243_persistent-activity-queue/up.sql new file mode 100644 index 000000000..fc40735f0 --- /dev/null +++ b/migrations/2023-08-01-115243_persistent-activity-queue/up.sql @@ -0,0 +1,32 @@ +CREATE TYPE actor_type_enum AS enum ( + 'site', + 'community', + 'person' +); + +-- actor_apub_id only null for old entries before this migration +ALTER TABLE sent_activity + ADD COLUMN send_inboxes text[] NOT NULL DEFAULT '{}', -- list of specific inbox urls + ADD COLUMN send_community_followers_of integer DEFAULT NULL, + ADD COLUMN send_all_instances boolean NOT NULL DEFAULT FALSE, + ADD COLUMN actor_type actor_type_enum NOT NULL DEFAULT 'person', + ADD COLUMN actor_apub_id text DEFAULT NULL; + +ALTER TABLE sent_activity + ALTER COLUMN send_inboxes DROP DEFAULT, + ALTER COLUMN send_community_followers_of DROP DEFAULT, + ALTER COLUMN send_all_instances DROP DEFAULT, + ALTER COLUMN actor_type DROP DEFAULT, + ALTER COLUMN actor_apub_id DROP DEFAULT; + +CREATE TABLE federation_queue_state ( + id serial PRIMARY KEY, + instance_id integer NOT NULL UNIQUE REFERENCES instance (id), + last_successful_id bigint NOT NULL, + fail_count integer NOT NULL, + last_retry timestamptz NOT NULL +); + +-- for incremental fetches of followers +CREATE INDEX idx_community_follower_published ON community_follower (published); + diff --git a/src/lib.rs b/src/lib.rs index 55135723b..c871af787 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,24 +16,26 @@ use crate::{ use activitypub_federation::config::{FederationConfig, FederationMiddleware}; use actix_cors::Cors; use actix_web::{ + dev::ServerHandle, middleware::{self, ErrorHandlers}, web::Data, App, HttpServer, Result, }; +use clap::{ArgAction, Parser}; use lemmy_api_common::{ context::LemmyContext, lemmy_db_views::structs::SiteView, request::build_user_agent, - send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES}, + send_activity::MATCH_OUTGOING_ACTIVITIES, utils::{ check_private_instance_and_federation_enabled, local_site_rate_limit_to_rate_limit_config, }, }; use lemmy_apub::{ - activities::{handle_outgoing_activities, match_outgoing_activities}, + activities::match_outgoing_activities, VerifyUrlData, FEDERATION_HTTP_FETCH_LIMIT, }; @@ -41,18 +43,19 @@ use lemmy_db_schema::{ source::secret::Secret, utils::{build_db_pool, get_database_url, run_migrations}, }; +use lemmy_federate::{start_stop_federation_workers_cancellable, Opts}; use lemmy_routes::{feeds, images, nodeinfo, webfinger}; use lemmy_utils::{ error::LemmyError, rate_limit::RateLimitCell, response::jsonify_plain_text_errors, - settings::SETTINGS, - SYNCHRONOUS_FEDERATION, + settings::{structs::Settings, SETTINGS}, }; use reqwest::Client; -use reqwest_middleware::ClientBuilder; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; -use std::{env, thread, time::Duration}; +use std::{env, ops::Deref, thread, time::Duration}; +use tokio::signal::unix::SignalKind; use tracing::subscriber::set_global_default; use tracing_actix_web::TracingLogger; use tracing_error::ErrorLayer; @@ -66,15 +69,53 @@ use { prometheus_metrics::serve_prometheus, }; +#[derive(Parser, Debug)] +#[command( + version, + about = "A link aggregator for the fediverse", + long_about = "A link aggregator for the fediverse.\n\nThis is the Lemmy backend API server. This will connect to a PostgreSQL database, run any pending migrations and start accepting API requests." +)] +pub struct CmdArgs { + #[arg(long, default_value_t = false)] + /// Disables running scheduled tasks. + /// + /// If you are running multiple Lemmy server processes, + /// you probably want to disable scheduled tasks on all but one of the processes, + /// to avoid running the tasks more often than intended. + disable_scheduled_tasks: bool, + /// Whether or not to run the HTTP server. + /// + /// This can be used to run a Lemmy server process that only runs scheduled tasks. + #[arg(long, default_value_t = true, action=ArgAction::Set)] + http_server: bool, + /// Whether or not to emit outgoing ActivityPub messages. + /// + /// Set to true for a simple setup. Only set to false for horizontally scaled setups. + /// See https://join-lemmy.org/docs/administration/horizontal_scaling.html for detail. + #[arg(long, default_value_t = true, action=ArgAction::Set)] + federate_activities: bool, + /// The index of this outgoing federation process. + /// + /// Defaults to 1/1. If you want to split the federation workload onto n servers, run each server 1≤i≤n with these args: + /// --federate-process-index i --federate-process-count n + /// + /// Make you have exactly one server with each `i` running, otherwise federation will randomly send duplicates or nothing. + /// + /// See https://join-lemmy.org/docs/administration/horizontal_scaling.html for more detail. + #[arg(long, default_value_t = 1)] + federate_process_index: i32, + /// How many outgoing federation processes you are starting in total. + /// + /// If set, make sure to set --federate-process-index differently for each. + #[arg(long, default_value_t = 1)] + federate_process_count: i32, +} /// Max timeout for http requests pub(crate) const REQWEST_TIMEOUT: Duration = Duration::from_secs(10); /// Placing the main function in lib.rs allows other crates to import it and embed Lemmy -pub async fn start_lemmy_server() -> Result<(), LemmyError> { - let args: Vec = env::args().collect(); - - let scheduled_tasks_enabled = args.get(1) != Some(&"--disable-scheduled-tasks".to_string()); - +pub async fn start_lemmy_server(args: CmdArgs) -> Result<(), LemmyError> { + let scheduled_tasks_enabled = !args.disable_scheduled_tasks; let settings = SETTINGS.to_owned(); // Run the DB migrations @@ -152,21 +193,73 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { #[cfg(feature = "prometheus-metrics")] serve_prometheus(settings.prometheus.as_ref(), context.clone()); - let settings_bind = settings.clone(); - let federation_config = FederationConfig::builder() .domain(settings.hostname.clone()) .app_data(context.clone()) .client(client.clone()) .http_fetch_limit(FEDERATION_HTTP_FETCH_LIMIT) - .worker_count(settings.worker_count) - .retry_count(settings.retry_count) - .debug(*SYNCHRONOUS_FEDERATION) + .debug(cfg!(debug_assertions)) .http_signature_compat(true) .url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone()))) .build() .await?; + MATCH_OUTGOING_ACTIVITIES + .set(Box::new(move |d, c| { + Box::pin(match_outgoing_activities(d, c)) + })) + .expect("set function pointer"); + + let server = if args.http_server { + Some(create_http_server( + federation_config.clone(), + settings.clone(), + federation_enabled, + pictrs_client, + )?) + } else { + None + }; + let federate = args.federate_activities.then(|| { + start_stop_federation_workers_cancellable( + Opts { + process_index: args.federate_process_index, + process_count: args.federate_process_count, + }, + pool.clone(), + federation_config.clone(), + ) + }); + let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; + let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; + + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::warn!("Received ctrl-c, shutting down gracefully..."); + } + _ = interrupt.recv() => { + tracing::warn!("Received interrupt, shutting down gracefully..."); + } + _ = terminate.recv() => { + tracing::warn!("Received terminate, shutting down gracefully..."); + } + } + if let Some(server) = server { + server.stop(true).await; + } + if let Some(federate) = federate { + federate.cancel().await?; + } + + Ok(()) +} + +fn create_http_server( + federation_config: FederationConfig, + settings: Settings, + federation_enabled: bool, + pictrs_client: ClientWithMiddleware, +) -> Result { // this must come before the HttpServer creation // creates a middleware that populates http metrics for each path, method, and status code #[cfg(feature = "prometheus-metrics")] @@ -175,21 +268,16 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { .build() .expect("Should always be buildable"); - MATCH_OUTGOING_ACTIVITIES - .set(Box::new(move |d, c| { - Box::pin(match_outgoing_activities(d, c)) - })) - .expect("set function pointer"); - let request_data = federation_config.to_request_data(); - let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); - + let context: LemmyContext = federation_config.deref().clone(); + let rate_limit_cell = federation_config.settings_updated_channel().clone(); + let self_origin = settings.get_protocol_and_hostname(); // Create Http server with websocket support - HttpServer::new(move || { + let server = HttpServer::new(move || { let cors_origin = env::var("LEMMY_CORS_ORIGIN"); let cors_config = match (cors_origin, cfg!(debug_assertions)) { (Ok(origin), false) => Cors::default() .allowed_origin(&origin) - .allowed_origin(&settings.get_protocol_and_hostname()), + .allowed_origin(&self_origin), _ => Cors::default() .allow_any_origin() .allow_any_method() @@ -217,7 +305,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { // The routes app - .configure(|cfg| api_routes_http::config(cfg, rate_limit_cell)) + .configure(|cfg| api_routes_http::config(cfg, &rate_limit_cell)) .configure(|cfg| { if federation_enabled { lemmy_apub::http::routes::config(cfg); @@ -225,17 +313,15 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { } }) .configure(feeds::config) - .configure(|cfg| images::config(cfg, pictrs_client.clone(), rate_limit_cell)) + .configure(|cfg| images::config(cfg, pictrs_client.clone(), &rate_limit_cell)) .configure(nodeinfo::config) }) - .bind((settings_bind.bind, settings_bind.port))? - .run() - .await?; - - // Wait for outgoing apub sends to complete - ActivityChannel::close(outgoing_activities_task).await?; - - Ok(()) + .disable_signals() + .bind((settings.bind, settings.port))? + .run(); + let handle = server.handle(); + tokio::task::spawn(server); + Ok(handle) } pub fn init_logging(opentelemetry_url: &Option) -> Result<(), LemmyError> { diff --git a/src/main.rs b/src/main.rs index 5fc03ed02..a876f7f11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,14 @@ -use lemmy_server::{init_logging, start_lemmy_server}; +use clap::Parser; +use lemmy_server::{init_logging, start_lemmy_server, CmdArgs}; use lemmy_utils::{error::LemmyError, settings::SETTINGS}; #[tokio::main] pub async fn main() -> Result<(), LemmyError> { init_logging(&SETTINGS.opentelemetry_url)?; + let args = CmdArgs::parse(); + #[cfg(not(feature = "embed-pictrs"))] - start_lemmy_server().await?; + start_lemmy_server(args).await?; #[cfg(feature = "embed-pictrs")] { let pictrs_port = &SETTINGS @@ -33,7 +36,7 @@ pub async fn main() -> Result<(), LemmyError> { })) .init::<&str>(None) .expect("initialize pictrs config"); - let (lemmy, pictrs) = tokio::join!(start_lemmy_server(), pict_rs::run()); + let (lemmy, pictrs) = tokio::join!(start_lemmy_server(args), pict_rs::run()); lemmy?; pictrs.expect("run pictrs"); } diff --git a/src/root_span_builder.rs b/src/root_span_builder.rs index 016074c7d..e062ade21 100644 --- a/src/root_span_builder.rs +++ b/src/root_span_builder.rs @@ -66,7 +66,6 @@ fn handle_error(span: Span, status_code: StatusCode, response_error: &dyn Respon // pre-formatting errors is a workaround for https://github.com/tokio-rs/tracing/issues/1565 let display_error = format!("{response_error}"); - let debug_error = format!("{response_error:?}"); tracing::info_span!( parent: None, @@ -74,12 +73,11 @@ fn handle_error(span: Span, status_code: StatusCode, response_error: &dyn Respon ) .in_scope(|| { if status_code.is_client_error() { - tracing::warn!("{}\n{}", display_error, debug_error); + tracing::warn!("{}", display_error); } else { - tracing::error!("{}\n{}", display_error, debug_error); + tracing::error!("{}", display_error); } }); span.record("exception.message", &tracing::field::display(display_error)); - span.record("exception.details", &tracing::field::display(debug_error)); }