diff --git a/internal/processing/stream/statusupdate.go b/internal/processing/stream/statusupdate.go
new file mode 100644
index 000000000..fd8e388ce
--- /dev/null
+++ b/internal/processing/stream/statusupdate.go
@@ -0,0 +1,38 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package stream
+
+import (
+ "encoding/json"
+ "fmt"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+// StatusUpdate streams the given edited status to any open, appropriate
+// streams belonging to the given account.
+func (p *Processor) StatusUpdate(s *apimodel.Status, account *gtsmodel.Account, streamTypes []string) error {
+ bytes, err := json.Marshal(s)
+ if err != nil {
+ return fmt.Errorf("error marshalling status to json: %s", err)
+ }
+
+ return p.toAccount(string(bytes), stream.EventTypeStatusUpdate, streamTypes, account.ID)
+}
diff --git a/internal/processing/stream/statusupdate_test.go b/internal/processing/stream/statusupdate_test.go
new file mode 100644
index 000000000..7b987b412
--- /dev/null
+++ b/internal/processing/stream/statusupdate_test.go
@@ -0,0 +1,137 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package stream_test
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+)
+
+type StatusUpdateTestSuite struct {
+ StreamTestSuite
+}
+
+func (suite *StatusUpdateTestSuite) TestStreamNotification() {
+ account := suite.testAccounts["local_account_1"]
+
+ openStream, errWithCode := suite.streamProcessor.Open(context.Background(), account, "user")
+ suite.NoError(errWithCode)
+
+ editedStatus := suite.testStatuses["remote_account_1_status_1"]
+ apiStatus, err := typeutils.NewConverter(&suite.state).StatusToAPIStatus(context.Background(), editedStatus, account)
+ suite.NoError(err)
+
+ err = suite.streamProcessor.StatusUpdate(apiStatus, account, []string{stream.TimelineHome})
+ suite.NoError(err)
+
+ msg := <-openStream.Messages
+ dst := new(bytes.Buffer)
+ err = json.Indent(dst, []byte(msg.Payload), "", " ")
+ suite.NoError(err)
+ suite.Equal(`{
+ "id": "01FVW7JHQFSFK166WWKR8CBA6M",
+ "created_at": "2021-09-20T10:40:37.000Z",
+ "in_reply_to_id": null,
+ "in_reply_to_account_id": null,
+ "sensitive": false,
+ "spoiler_text": "",
+ "visibility": "unlisted",
+ "language": "en",
+ "uri": "http://fossbros-anonymous.io/users/foss_satan/statuses/01FVW7JHQFSFK166WWKR8CBA6M",
+ "url": "http://fossbros-anonymous.io/@foss_satan/statuses/01FVW7JHQFSFK166WWKR8CBA6M",
+ "replies_count": 0,
+ "reblogs_count": 0,
+ "favourites_count": 0,
+ "favourited": false,
+ "reblogged": false,
+ "muted": false,
+ "bookmarked": false,
+ "pinned": false,
+ "content": "dark souls status bot: \"thoughts of dog\"",
+ "reblog": null,
+ "account": {
+ "id": "01F8MH5ZK5VRH73AKHQM6Y9VNX",
+ "username": "foss_satan",
+ "acct": "foss_satan@fossbros-anonymous.io",
+ "display_name": "big gerald",
+ "locked": false,
+ "discoverable": true,
+ "bot": false,
+ "created_at": "2021-09-26T10:52:36.000Z",
+ "note": "i post about like, i dunno, stuff, or whatever!!!!",
+ "url": "http://fossbros-anonymous.io/@foss_satan",
+ "avatar": "",
+ "avatar_static": "",
+ "header": "http://localhost:8080/assets/default_header.png",
+ "header_static": "http://localhost:8080/assets/default_header.png",
+ "followers_count": 0,
+ "following_count": 0,
+ "statuses_count": 3,
+ "last_status_at": "2021-09-11T09:40:37.000Z",
+ "emojis": [],
+ "fields": []
+ },
+ "media_attachments": [
+ {
+ "id": "01FVW7RXPQ8YJHTEXYPE7Q8ZY0",
+ "type": "image",
+ "url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/original/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg",
+ "text_url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/original/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg",
+ "preview_url": "http://localhost:8080/fileserver/01F8MH5ZK5VRH73AKHQM6Y9VNX/attachment/small/01FVW7RXPQ8YJHTEXYPE7Q8ZY0.jpg",
+ "remote_url": "http://fossbros-anonymous.io/attachments/original/13bbc3f8-2b5e-46ea-9531-40b4974d9912.jpg",
+ "preview_remote_url": "http://fossbros-anonymous.io/attachments/small/a499f55b-2d1e-4acd-98d2-1ac2ba6d79b9.jpg",
+ "meta": {
+ "original": {
+ "width": 472,
+ "height": 291,
+ "size": "472x291",
+ "aspect": 1.6219932
+ },
+ "small": {
+ "width": 472,
+ "height": 291,
+ "size": "472x291",
+ "aspect": 1.6219932
+ },
+ "focus": {
+ "x": 0,
+ "y": 0
+ }
+ },
+ "description": "tweet from thoughts of dog: i drank. all the water. in my bowl. earlier. but just now. i returned. to the same bowl. and it was. full again.. the bowl. is haunted",
+ "blurhash": "LARysgM_IU_3~pD%M_Rj_39FIAt6"
+ }
+ ],
+ "mentions": [],
+ "tags": [],
+ "emojis": [],
+ "card": null,
+ "poll": null
+}`, dst.String())
+ suite.Equal(msg.Event, "status.update")
+}
+
+func TestStatusUpdateTestSuite(t *testing.T) {
+ suite.Run(t, &StatusUpdateTestSuite{})
+}
diff --git a/internal/processing/stream/stream_test.go b/internal/processing/stream/stream_test.go
index bd12674e7..2569ac701 100644
--- a/internal/processing/stream/stream_test.go
+++ b/internal/processing/stream/stream_test.go
@@ -30,6 +30,7 @@ import (
type StreamTestSuite struct {
suite.Suite
testAccounts map[string]*gtsmodel.Account
+ testStatuses map[string]*gtsmodel.Status
testTokens map[string]*gtsmodel.Token
db db.DB
oauthServer oauth.Server
@@ -45,6 +46,7 @@ func (suite *StreamTestSuite) SetupTest() {
testrig.InitTestConfig()
suite.testAccounts = testrig.NewTestAccounts()
+ suite.testStatuses = testrig.NewTestStatuses()
suite.testTokens = testrig.NewTestTokens()
suite.db = testrig.NewTestDB(&suite.state)
suite.state.DB = suite.db
diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go
index e3f1e2d76..05b9acc1f 100644
--- a/internal/processing/workers/fromclientapi.go
+++ b/internal/processing/workers/fromclientapi.go
@@ -416,6 +416,11 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg messages.FromClientAP
}
}
+ // Push message that the status has been edited to streams.
+ if err := p.surface.timelineStatusUpdate(ctx, status); err != nil {
+ log.Errorf(ctx, "error streaming status edit: %v", err)
+ }
+
return nil
}
diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go
index d04e4ab8d..6dd4e543d 100644
--- a/internal/processing/workers/fromfediapi.go
+++ b/internal/processing/workers/fromfediapi.go
@@ -530,6 +530,11 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e
}
}
+ // Push message that the status has been edited to streams.
+ if err := p.surface.timelineStatusUpdate(ctx, status); err != nil {
+ log.Errorf(ctx, "error streaming status edit: %v", err)
+ }
+
return nil
}
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
index baebdbc66..e63b8a7c0 100644
--- a/internal/processing/workers/surfacetimeline.go
+++ b/internal/processing/workers/surfacetimeline.go
@@ -390,3 +390,176 @@ func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID st
Errorf("error unpreparing status from list timelines: %v", err)
}
}
+
+// timelineStatusUpdate looks up HOME and LIST timelines of accounts
+// that follow the the status author and pushes edit messages into any
+// active streams.
+// Note that calling invalidateStatusFromTimelines takes care of the
+// state in general, we just need to do this for any streams that are
+// open right now.
+func (s *surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Status) error {
+ // Ensure status fully populated; including account, mentions, etc.
+ if err := s.state.DB.PopulateStatus(ctx, status); err != nil {
+ return gtserror.Newf("error populating status with id %s: %w", status.ID, err)
+ }
+
+ // Get all local followers of the account that posted the status.
+ follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID)
+ if err != nil {
+ return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err)
+ }
+
+ // If the poster is also local, add a fake entry for them
+ // so they can see their own status in their timeline.
+ if status.Account.IsLocal() {
+ follows = append(follows, >smodel.Follow{
+ AccountID: status.AccountID,
+ Account: status.Account,
+ Notify: func() *bool { b := false; return &b }(), // Account shouldn't notify itself.
+ ShowReblogs: func() *bool { b := true; return &b }(), // Account should show own reblogs.
+ })
+ }
+
+ // Push to streams for each local follower of this account.
+ if err := s.timelineStatusUpdateForFollowers(ctx, status, follows); err != nil {
+ return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err)
+ }
+
+ return nil
+}
+
+// timelineStatusUpdateForFollowers iterates through the given
+// slice of followers of the account that posted the given status,
+// pushing update messages into open list/home streams of each
+// follower.
+func (s *surface) timelineStatusUpdateForFollowers(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ follows []*gtsmodel.Follow,
+) error {
+ var (
+ errs gtserror.MultiError
+ )
+
+ for _, follow := range follows {
+ // Check to see if the status is timelineable for this follower,
+ // taking account of its visibility, who it replies to, and, if
+ // it's a reblog, whether follower account wants to see reblogs.
+ //
+ // If it's not timelineable, we can just stop early, since lists
+ // are prettymuch subsets of the home timeline, so if it shouldn't
+ // appear there, it shouldn't appear in lists either.
+ timelineable, err := s.filter.StatusHomeTimelineable(
+ ctx, follow.Account, status,
+ )
+ if err != nil {
+ errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err)
+ continue
+ }
+
+ if !timelineable {
+ // Nothing to do.
+ continue
+ }
+
+ // Add status to any relevant lists
+ // for this follow, if applicable.
+ s.listTimelineStatusUpdateForFollow(
+ ctx,
+ status,
+ follow,
+ &errs,
+ )
+
+ // Add status to home timeline for owner
+ // of this follow, if applicable.
+ err = s.timelineStreamStatusUpdate(
+ ctx,
+ follow.Account,
+ status,
+ stream.TimelineHome,
+ )
+ if err != nil {
+ errs.Appendf("error home timelining status: %w", err)
+ continue
+ }
+ }
+
+ return errs.Combine()
+}
+
+// listTimelineStatusUpdateForFollow pushes edits of the given status
+// into any eligible lists streams opened by the given follower.
+func (s *surface) listTimelineStatusUpdateForFollow(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ follow *gtsmodel.Follow,
+ errs *gtserror.MultiError,
+) {
+ // To put this status in appropriate list timelines,
+ // we need to get each listEntry that pertains to
+ // this follow. Then, we want to iterate through all
+ // those list entries, and add the status to the list
+ // that the entry belongs to if it meets criteria for
+ // inclusion in the list.
+
+ // Get every list entry that targets this follow's ID.
+ listEntries, err := s.state.DB.GetListEntriesForFollowID(
+ // We only need the list IDs.
+ gtscontext.SetBarebones(ctx),
+ follow.ID,
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ errs.Appendf("error getting list entries: %w", err)
+ return
+ }
+
+ // Check eligibility for each list entry (if any).
+ for _, listEntry := range listEntries {
+ eligible, err := s.listEligible(ctx, listEntry, status)
+ if err != nil {
+ errs.Appendf("error checking list eligibility: %w", err)
+ continue
+ }
+
+ if !eligible {
+ // Don't add this.
+ continue
+ }
+
+ // At this point we are certain this status
+ // should be included in the timeline of the
+ // list that this list entry belongs to.
+ if err := s.timelineStreamStatusUpdate(
+ ctx,
+ follow.Account,
+ status,
+ stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list
+ ); err != nil {
+ errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err)
+ // implicit continue
+ }
+ }
+}
+
+// timelineStatusUpdate streams the edited status to the user using the
+// given streamType.
+func (s *surface) timelineStreamStatusUpdate(
+ ctx context.Context,
+ account *gtsmodel.Account,
+ status *gtsmodel.Status,
+ streamType string,
+) error {
+ apiStatus, err := s.converter.StatusToAPIStatus(ctx, status, account)
+ if err != nil {
+ err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err)
+ return err
+ }
+
+ if err := s.stream.StatusUpdate(apiStatus, account, []string{streamType}); err != nil {
+ err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err)
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/stream/stream.go b/internal/stream/stream.go
index ae815e029..da5647433 100644
--- a/internal/stream/stream.go
+++ b/internal/stream/stream.go
@@ -26,6 +26,9 @@ const (
EventTypeUpdate string = "update"
// EventTypeDelete -- something should be deleted from a user
EventTypeDelete string = "delete"
+ // EventTypeStatusUpdate -- something in the user's timeline has been edited
+ // (yes this is a confusing name, blame Mastodon)
+ EventTypeStatusUpdate string = "status.update"
)
const (