mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-14 23:32:45 +00:00
Use HTTP API for roomserver input. (#161)
* Use HTTP API for roomserver input. * Use synchronous HTTP API for writing events to the roomserver * Remove unused config for kafka topic * Tweak comments
This commit is contained in:
parent
d9b8e5de45
commit
e6d77d6bde
10 changed files with 94 additions and 271 deletions
|
@ -15,35 +15,24 @@
|
||||||
package producers
|
package producers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RoomserverProducer produces events for the roomserver to consume.
|
// RoomserverProducer produces events for the roomserver to consume.
|
||||||
type RoomserverProducer struct {
|
type RoomserverProducer struct {
|
||||||
Topic string
|
InputAPI api.RoomserverInputAPI
|
||||||
Producer sarama.SyncProducer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRoomserverProducer creates a new RoomserverProducer
|
// NewRoomserverProducer creates a new RoomserverProducer
|
||||||
func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) {
|
func NewRoomserverProducer(roomserverURI string) *RoomserverProducer {
|
||||||
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &RoomserverProducer{
|
return &RoomserverProducer{
|
||||||
Topic: topic,
|
InputAPI: api.NewRoomserverInputAPIHTTP(roomserverURI, nil),
|
||||||
Producer: producer,
|
}
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
|
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
|
||||||
func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error {
|
func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error {
|
||||||
eventIDs := make([]string, len(events))
|
|
||||||
ires := make([]api.InputRoomEvent, len(events))
|
ires := make([]api.InputRoomEvent, len(events))
|
||||||
for i, event := range events {
|
for i, event := range events {
|
||||||
ires[i] = api.InputRoomEvent{
|
ires[i] = api.InputRoomEvent{
|
||||||
|
@ -52,9 +41,8 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs
|
||||||
AuthEventIDs: event.AuthEventIDs(),
|
AuthEventIDs: event.AuthEventIDs(),
|
||||||
SendAsServer: string(sendAsServer),
|
SendAsServer: string(sendAsServer),
|
||||||
}
|
}
|
||||||
eventIDs[i] = event.EventID()
|
|
||||||
}
|
}
|
||||||
return c.SendInputRoomEvents(ires, eventIDs)
|
return c.SendInputRoomEvents(ires)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendEventWithState writes an event with KindNew to the roomserver input log
|
// SendEventWithState writes an event with KindNew to the roomserver input log
|
||||||
|
@ -65,7 +53,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
eventIDs := make([]string, len(outliers)+1)
|
|
||||||
ires := make([]api.InputRoomEvent, len(outliers)+1)
|
ires := make([]api.InputRoomEvent, len(outliers)+1)
|
||||||
for i, outlier := range outliers {
|
for i, outlier := range outliers {
|
||||||
ires[i] = api.InputRoomEvent{
|
ires[i] = api.InputRoomEvent{
|
||||||
|
@ -73,7 +60,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
|
||||||
Event: outlier,
|
Event: outlier,
|
||||||
AuthEventIDs: outlier.AuthEventIDs(),
|
AuthEventIDs: outlier.AuthEventIDs(),
|
||||||
}
|
}
|
||||||
eventIDs[i] = outlier.EventID()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stateEventIDs := make([]string, len(state.StateEvents))
|
stateEventIDs := make([]string, len(state.StateEvents))
|
||||||
|
@ -88,41 +74,14 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
|
||||||
HasState: true,
|
HasState: true,
|
||||||
StateEventIDs: stateEventIDs,
|
StateEventIDs: stateEventIDs,
|
||||||
}
|
}
|
||||||
eventIDs[len(outliers)] = event.EventID()
|
|
||||||
|
|
||||||
return c.SendInputRoomEvents(ires, eventIDs)
|
return c.SendInputRoomEvents(ires)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both
|
// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both
|
||||||
// arrays must match, and each element must correspond to the same event.
|
// arrays must match, and each element must correspond to the same event.
|
||||||
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error {
|
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent) error {
|
||||||
// TODO: Nicer way of doing this. Options are:
|
request := api.InputRoomEventsRequest{InputRoomEvents: ires}
|
||||||
// A) Like this
|
var response api.InputRoomEventsResponse
|
||||||
// B) Add EventID field to InputRoomEvent
|
return c.InputAPI.InputRoomEvents(&request, &response)
|
||||||
// C) Add wrapper struct with the EventID and the InputRoomEvent
|
|
||||||
if len(eventIDs) != len(ires) {
|
|
||||||
return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires))
|
|
||||||
}
|
|
||||||
|
|
||||||
msgs := make([]*sarama.ProducerMessage, len(ires))
|
|
||||||
for i := range ires {
|
|
||||||
msg, err := c.toProducerMessage(ires[i], eventIDs[i])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
msgs[i] = msg
|
|
||||||
}
|
|
||||||
return c.Producer.SendMessages(msgs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) {
|
|
||||||
value, err := json.Marshal(ire)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var m sarama.ProducerMessage
|
|
||||||
m.Topic = c.Topic
|
|
||||||
m.Key = sarama.StringEncoder(eventID)
|
|
||||||
m.Value = sarama.ByteEncoder(value)
|
|
||||||
return &m, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,9 +51,9 @@ func main() {
|
||||||
|
|
||||||
log.Info("config: ", cfg)
|
log.Info("config: ", cfg)
|
||||||
|
|
||||||
roomserverProducer, err := producers.NewRoomserverProducer(
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
|
|
||||||
)
|
roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL())
|
||||||
userUpdateProducer, err := producers.NewUserUpdateProducer(
|
userUpdateProducer, err := producers.NewUserUpdateProducer(
|
||||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
|
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
|
||||||
)
|
)
|
||||||
|
@ -65,7 +65,6 @@ func main() {
|
||||||
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||||
)
|
)
|
||||||
|
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
|
||||||
accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName)
|
accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error())
|
log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error())
|
||||||
|
|
|
@ -67,9 +67,7 @@ func main() {
|
||||||
|
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
|
||||||
roomserverProducer, err := producers.NewRoomserverProducer(
|
roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL())
|
||||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
|
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err)
|
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err)
|
||||||
|
|
|
@ -16,11 +16,9 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
@ -35,7 +33,6 @@ import (
|
||||||
var (
|
var (
|
||||||
logDir = os.Getenv("LOG_DIR")
|
logDir = os.Getenv("LOG_DIR")
|
||||||
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
stopProcessingAfter = os.Getenv("STOP_AFTER")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -56,49 +53,25 @@ func main() {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
|
kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer := input.Consumer{
|
|
||||||
ContinualConsumer: common.ContinualConsumer{
|
|
||||||
Topic: string(cfg.Kafka.Topics.InputRoomEvent),
|
|
||||||
Consumer: kafkaConsumer,
|
|
||||||
PartitionStore: db,
|
|
||||||
},
|
|
||||||
DB: db,
|
|
||||||
Producer: kafkaProducer,
|
|
||||||
OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
|
||||||
}
|
|
||||||
|
|
||||||
if stopProcessingAfter != "" {
|
|
||||||
count, err := strconv.ParseInt(stopProcessingAfter, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
consumer.StopProcessingAfter = &count
|
|
||||||
consumer.ShutdownCallback = func(message string) {
|
|
||||||
fmt.Println("Stopping roomserver", message)
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = consumer.Start(); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
queryAPI := query.RoomserverQueryAPI{
|
queryAPI := query.RoomserverQueryAPI{
|
||||||
DB: db,
|
DB: db,
|
||||||
}
|
}
|
||||||
|
|
||||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||||
|
|
||||||
|
inputAPI := input.RoomserverInputAPI{
|
||||||
|
DB: db,
|
||||||
|
Producer: kafkaProducer,
|
||||||
|
OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
|
}
|
||||||
|
|
||||||
|
inputAPI.SetupHTTP(http.DefaultServeMux)
|
||||||
|
|
||||||
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
||||||
|
|
||||||
log.Info("Started room server on ", cfg.Listen.RoomServer)
|
log.Info("Started room server on ", cfg.Listen.RoomServer)
|
||||||
|
|
|
@ -23,6 +23,10 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common/test"
|
"github.com/matrix-org/dendrite/common/test"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -90,7 +94,7 @@ func createDatabase(database string) error {
|
||||||
// messages is reached or after a timeout. It kills the command before it returns.
|
// messages is reached or after a timeout. It kills the command before it returns.
|
||||||
// It returns a list of the messages read from the command on success or an error
|
// It returns a list of the messages read from the command on success or an error
|
||||||
// on failure.
|
// on failure.
|
||||||
func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAPI func()) ([]string, error) {
|
func runAndReadFromTopic(runCmd *exec.Cmd, readyURL string, doInput func(), topic string, count int, checkQueryAPI func()) ([]string, error) {
|
||||||
type result struct {
|
type result struct {
|
||||||
// data holds all of stdout on success.
|
// data holds all of stdout on success.
|
||||||
data []byte
|
data []byte
|
||||||
|
@ -107,6 +111,11 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
|
||||||
)
|
)
|
||||||
// Send stderr to our stderr so the user can see any error messages.
|
// Send stderr to our stderr so the user can see any error messages.
|
||||||
readCmd.Stderr = os.Stderr
|
readCmd.Stderr = os.Stderr
|
||||||
|
|
||||||
|
// Kill both processes before we exit.
|
||||||
|
defer func() { runCmd.Process.Kill() }()
|
||||||
|
defer func() { readCmd.Process.Kill() }()
|
||||||
|
|
||||||
// Run the command, read the messages and wait for a timeout in parallel.
|
// Run the command, read the messages and wait for a timeout in parallel.
|
||||||
go func() {
|
go func() {
|
||||||
// Read all of stdout.
|
// Read all of stdout.
|
||||||
|
@ -131,14 +140,40 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
|
||||||
time.Sleep(timeout)
|
time.Sleep(timeout)
|
||||||
done <- result{nil, fmt.Errorf("Timeout reading %d messages from topic %q", count, topic)}
|
done <- result{nil, fmt.Errorf("Timeout reading %d messages from topic %q", count, topic)}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Poll the HTTP listener of the process waiting for it to be ready to receive requests.
|
||||||
|
ready := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
delay := 10 * time.Millisecond
|
||||||
|
for {
|
||||||
|
time.Sleep(delay)
|
||||||
|
if delay < 100*time.Millisecond {
|
||||||
|
delay *= 2
|
||||||
|
}
|
||||||
|
resp, err := http.Get(readyURL)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if resp.StatusCode == 200 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ready <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for the roomserver to be ready to receive input or for it to crash.
|
||||||
|
select {
|
||||||
|
case <-ready:
|
||||||
|
case r := <-done:
|
||||||
|
return nil, r.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the input now that the server is running.
|
||||||
|
doInput()
|
||||||
|
|
||||||
// Wait for one of the tasks to finsh.
|
// Wait for one of the tasks to finsh.
|
||||||
r := <-done
|
r := <-done
|
||||||
|
|
||||||
// Kill both processes. We don't check if the processes are running and
|
|
||||||
// we ignore failures since we are just trying to clean up before returning.
|
|
||||||
runCmd.Process.Kill()
|
|
||||||
readCmd.Process.Kill()
|
|
||||||
|
|
||||||
if r.err != nil {
|
if r.err != nil {
|
||||||
return nil, r.err
|
return nil, r.err
|
||||||
}
|
}
|
||||||
|
@ -153,6 +188,20 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
|
||||||
return lines, nil
|
return lines, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeToRoomServer(input []string, roomserverURL string) error {
|
||||||
|
var request api.InputRoomEventsRequest
|
||||||
|
var response api.InputRoomEventsResponse
|
||||||
|
var err error
|
||||||
|
request.InputRoomEvents = make([]api.InputRoomEvent, len(input))
|
||||||
|
for i := range input {
|
||||||
|
if err = json.Unmarshal([]byte(input[i]), &request.InputRoomEvents[i]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil)
|
||||||
|
return x.InputRoomEvents(&request, &response)
|
||||||
|
}
|
||||||
|
|
||||||
// testRoomserver is used to run integration tests against a single roomserver.
|
// testRoomserver is used to run integration tests against a single roomserver.
|
||||||
// It creates new kafka topics for the input and output of the roomserver.
|
// It creates new kafka topics for the input and output of the roomserver.
|
||||||
// It writes the input messages to the input kafka topic, formatting each message
|
// It writes the input messages to the input kafka topic, formatting each message
|
||||||
|
@ -176,25 +225,23 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
inputTopic := string(cfg.Kafka.Topics.InputRoomEvent)
|
|
||||||
outputTopic := string(cfg.Kafka.Topics.OutputRoomEvent)
|
outputTopic := string(cfg.Kafka.Topics.OutputRoomEvent)
|
||||||
|
|
||||||
exe.DeleteTopic(inputTopic)
|
|
||||||
if err := exe.CreateTopic(inputTopic); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
exe.DeleteTopic(outputTopic)
|
exe.DeleteTopic(outputTopic)
|
||||||
if err := exe.CreateTopic(outputTopic); err != nil {
|
if err := exe.CreateTopic(outputTopic); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil {
|
if err = createDatabase(testDatabaseName); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = createDatabase(testDatabaseName); err != nil {
|
doInput := func() {
|
||||||
|
fmt.Printf("Roomserver is ready to receive input, sending %d events\n", len(input))
|
||||||
|
if err = writeToRoomServer(input, cfg.RoomServerURL()); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cmd := exec.Command(filepath.Join(filepath.Dir(os.Args[0]), "dendrite-room-server"))
|
cmd := exec.Command(filepath.Join(filepath.Dir(os.Args[0]), "dendrite-room-server"))
|
||||||
|
|
||||||
|
@ -205,7 +252,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
|
||||||
cmd.Stderr = os.Stderr
|
cmd.Stderr = os.Stderr
|
||||||
cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)}
|
cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)}
|
||||||
|
|
||||||
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, len(wantOutput), func() {
|
gotOutput, err := runAndReadFromTopic(cmd, cfg.RoomServerURL()+"/metrics", doInput, outputTopic, len(wantOutput), func() {
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil)
|
queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil)
|
||||||
checkQueries(queryAPI)
|
checkQueries(queryAPI)
|
||||||
})
|
})
|
||||||
|
|
|
@ -19,13 +19,14 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"golang.org/x/crypto/ed25519"
|
|
||||||
"gopkg.in/yaml.v2"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"golang.org/x/crypto/ed25519"
|
||||||
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Version is the current version of the config format.
|
// Version is the current version of the config format.
|
||||||
|
@ -95,8 +96,6 @@ type Dendrite struct {
|
||||||
Addresses []string `yaml:"addresses"`
|
Addresses []string `yaml:"addresses"`
|
||||||
// The names of the topics to use when reading and writing from kafka.
|
// The names of the topics to use when reading and writing from kafka.
|
||||||
Topics struct {
|
Topics struct {
|
||||||
// Topic for roomserver/api.InputRoomEvent events.
|
|
||||||
InputRoomEvent Topic `yaml:"input_room_event"`
|
|
||||||
// Topic for roomserver/api.OutputRoomEvent events.
|
// Topic for roomserver/api.OutputRoomEvent events.
|
||||||
OutputRoomEvent Topic `yaml:"output_room_event"`
|
OutputRoomEvent Topic `yaml:"output_room_event"`
|
||||||
// Topic for user updates (profile, presence)
|
// Topic for user updates (profile, presence)
|
||||||
|
@ -298,7 +297,6 @@ func (config *Dendrite) check() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
|
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
|
||||||
checkNotEmpty("kafka.topics.input_room_event", string(config.Kafka.Topics.InputRoomEvent))
|
|
||||||
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
||||||
checkNotEmpty("database.account", string(config.Database.Account))
|
checkNotEmpty("database.account", string(config.Database.Account))
|
||||||
checkNotEmpty("database.device", string(config.Database.Device))
|
checkNotEmpty("database.device", string(config.Database.Device))
|
||||||
|
|
|
@ -21,14 +21,15 @@ import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"gopkg.in/yaml.v2"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -80,7 +81,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||||
cfg.Kafka.Addresses = []string{kafkaURI}
|
cfg.Kafka.Addresses = []string{kafkaURI}
|
||||||
// TODO: Different servers should be using different topics.
|
// TODO: Different servers should be using different topics.
|
||||||
// Make this configurable somehow?
|
// Make this configurable somehow?
|
||||||
cfg.Kafka.Topics.InputRoomEvent = "test.room.input"
|
|
||||||
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
||||||
|
|
||||||
// TODO: Use different databases for the different schemas.
|
// TODO: Use different databases for the different schemas.
|
||||||
|
|
|
@ -1,133 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
// Package input contains the code that writes
|
|
||||||
package input
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
// A ConsumerDatabase has the storage APIs needed by the consumer.
|
|
||||||
type ConsumerDatabase interface {
|
|
||||||
RoomEventDatabase
|
|
||||||
common.PartitionStorer
|
|
||||||
}
|
|
||||||
|
|
||||||
// An ErrorLogger handles the errors encountered by the consumer.
|
|
||||||
type ErrorLogger interface {
|
|
||||||
OnError(message *sarama.ConsumerMessage, err error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A Consumer consumes a kafkaesque stream of room events.
|
|
||||||
// The room events are supplied as api.InputRoomEvent structs serialised as JSON.
|
|
||||||
// The events should be valid matrix events.
|
|
||||||
// The events needed to authenticate the event should already be stored on the roomserver.
|
|
||||||
// The events needed to construct the state at the event should already be stored on the roomserver.
|
|
||||||
// If the event is not valid then it will be discarded and an error will be logged.
|
|
||||||
type Consumer struct {
|
|
||||||
ContinualConsumer common.ContinualConsumer
|
|
||||||
// The database used to store the room events.
|
|
||||||
DB ConsumerDatabase
|
|
||||||
Producer sarama.SyncProducer
|
|
||||||
// The kafkaesque topic to output new room events to.
|
|
||||||
// This is the name used in kafka to identify the stream to write events to.
|
|
||||||
OutputRoomEventTopic string
|
|
||||||
// The ErrorLogger for this consumer.
|
|
||||||
// If left as nil then the consumer will panic when it encounters an error
|
|
||||||
ErrorLogger ErrorLogger
|
|
||||||
// If non-nil then the consumer will stop processing messages after this
|
|
||||||
// many messages and will shutdown. Malformed messages are included in the count.
|
|
||||||
StopProcessingAfter *int64
|
|
||||||
// If not-nil then the consumer will call this to shutdown the server.
|
|
||||||
ShutdownCallback func(reason string)
|
|
||||||
// How many messages the consumer has processed.
|
|
||||||
processed int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteOutputRoomEvent implements OutputRoomEventWriter
|
|
||||||
func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error {
|
|
||||||
var m sarama.ProducerMessage
|
|
||||||
oe := api.OutputEvent{
|
|
||||||
Type: api.OutputTypeNewRoomEvent,
|
|
||||||
NewRoomEvent: &output,
|
|
||||||
}
|
|
||||||
value, err := json.Marshal(oe)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m.Topic = c.OutputRoomEventTopic
|
|
||||||
m.Key = sarama.StringEncoder("")
|
|
||||||
m.Value = sarama.ByteEncoder(value)
|
|
||||||
_, _, err = c.Producer.SendMessage(&m)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start starts the consumer consuming.
|
|
||||||
// Starts up a goroutine for each partition in the kafka stream.
|
|
||||||
// Returns nil once all the goroutines are started.
|
|
||||||
// Returns an error if it can't start consuming for any of the partitions.
|
|
||||||
func (c *Consumer) Start() error {
|
|
||||||
c.ContinualConsumer.ProcessMessage = c.processMessage
|
|
||||||
c.ContinualConsumer.ShutdownCallback = c.shutdown
|
|
||||||
return c.ContinualConsumer.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error {
|
|
||||||
var input api.InputRoomEvent
|
|
||||||
if err := json.Unmarshal(message.Value, &input); err != nil {
|
|
||||||
// If the message is invalid then log it and move onto the next message in the stream.
|
|
||||||
c.logError(message, err)
|
|
||||||
} else {
|
|
||||||
if err := processRoomEvent(c.DB, c, input); err != nil {
|
|
||||||
// If there was an error processing the message then log it and
|
|
||||||
// move onto the next message in the stream.
|
|
||||||
// TODO: If the error was due to a problem talking to the database
|
|
||||||
// then we shouldn't move onto the next message and we should either
|
|
||||||
// retry processing the message, or panic and kill ourselves.
|
|
||||||
c.logError(message, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
|
|
||||||
processed := atomic.AddInt64(&c.processed, 1)
|
|
||||||
// Check if we should stop processing.
|
|
||||||
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
|
|
||||||
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
|
|
||||||
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
|
|
||||||
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
|
|
||||||
if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) {
|
|
||||||
return common.ErrShutdown
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Consumer) shutdown() {
|
|
||||||
if c.ShutdownCallback != nil {
|
|
||||||
c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// logError is a convenience method for logging errors.
|
|
||||||
func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) {
|
|
||||||
if c.ErrorLogger == nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
c.ErrorLogger.OnError(message, err)
|
|
||||||
}
|
|
|
@ -16,12 +16,9 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type statements struct {
|
type statements struct {
|
||||||
common.PartitionOffsetStatements
|
|
||||||
eventTypeStatements
|
eventTypeStatements
|
||||||
eventStateKeyStatements
|
eventStateKeyStatements
|
||||||
roomStatements
|
roomStatements
|
||||||
|
@ -35,10 +32,6 @@ type statements struct {
|
||||||
func (s *statements) prepare(db *sql.DB) error {
|
func (s *statements) prepare(db *sql.DB) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if err = s.PartitionOffsetStatements.Prepare(db); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = s.eventTypeStatements.prepare(db); err != nil {
|
if err = s.eventTypeStatements.prepare(db); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
@ -42,16 +41,6 @@ func Open(dataSourceName string) (*Database, error) {
|
||||||
return &d, nil
|
return &d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartitionOffsets implements input.ConsumerDatabase
|
|
||||||
func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
|
||||||
return d.statements.SelectPartitionOffsets(topic)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPartitionOffset implements input.ConsumerDatabase
|
|
||||||
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
|
||||||
return d.statements.UpsertPartitionOffset(topic, partition, offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
// StoreEvent implements input.EventDatabase
|
// StoreEvent implements input.EventDatabase
|
||||||
func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) {
|
func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) {
|
||||||
var (
|
var (
|
||||||
|
|
Loading…
Reference in a new issue