From 192a7a792320d97fd3da0903d6b09620f0e05b35 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 2 Jul 2021 09:48:55 +0100 Subject: [PATCH] Roomserver input backpressure metric Squashed commit of the following: commit 56e934ac0aeedcfb2c072010959ba49734d4e0cb Author: Neil Alexander Date: Fri Jul 2 09:39:30 2021 +0100 Fix metric commit 3911f3a0c17b164b012e881c085ceca30f5de408 Author: Neil Alexander Date: Fri Jul 2 09:36:29 2021 +0100 Register correct metric commit a9ddbfaed421538a701151801e9451198a8be4f3 Author: Neil Alexander Date: Fri Jul 2 09:33:33 2021 +0100 Try to capture RS input backpressure metric --- roomserver/internal/input/input.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index b8279a866..6bc43c9cd 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "go.uber.org/atomic" ) @@ -64,6 +65,9 @@ func (w *inputWorker) start() { if !ok { continue } + roomserverInputBackpressure.With(prometheus.Labels{ + "room_id": task.event.Event.RoomID(), + }).Dec() hooks.Run(hooks.KindNewEventReceived, task.event.Event) _, task.err = w.r.processRoomEvent(task.ctx, task.event) if task.err == nil { @@ -120,6 +124,20 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er return errs } +func init() { + prometheus.MustRegister(roomserverInputBackpressure) +} + +var roomserverInputBackpressure = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "roomserver", + Name: "input_backpressure", + Help: "How many events are queued for input for a given room", + }, + []string{"room_id"}, +) + // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( _ context.Context, @@ -164,6 +182,9 @@ func (r *Inputer) InputRoomEvents( go worker.start() } worker.input.push(tasks[i]) + roomserverInputBackpressure.With(prometheus.Labels{ + "room_id": roomID, + }).Inc() } // Wait for all of the workers to return results about our tasks.