mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-13 14:52:47 +00:00
Add race testing to tests, and fix a few small race conditions in the tests (#2587)
* Add race testing to tests, and fix a few small race conditions in the tests * Enable run-sytest on MacOS * Remove deadlock detecting mutex, per code review feedback * Remove autoformatting related changes and a closure that is not needed * Adjust to importing nats client as 'natsclient' Signed-off-by: Brian Meek <brian@hntlabs.com> * Clarify the use of gooseMutex to proect goose internal state Signed-off-by: Brian Meek <brian@hntlabs.com> * Remove no longer needed mutex for guarding goose Signed-off-by: Brian Meek <brian@hntlabs.com>
This commit is contained in:
parent
9a655cb5e7
commit
de78eab63a
7 changed files with 35 additions and 11 deletions
|
@ -13,4 +13,4 @@ go build ./cmd/...
|
||||||
./build/scripts/find-lint.sh
|
./build/scripts/find-lint.sh
|
||||||
|
|
||||||
echo "Testing..."
|
echo "Testing..."
|
||||||
go test -v ./...
|
go test --race -v ./...
|
||||||
|
|
|
@ -64,7 +64,7 @@ comment. Please avoid doing this if you can.
|
||||||
We also have unit tests which we run via:
|
We also have unit tests which we run via:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
go test ./...
|
go test --race ./...
|
||||||
```
|
```
|
||||||
|
|
||||||
In general, we like submissions that come with tests. Anything that proves that the
|
In general, we like submissions that come with tests. Anything that proves that the
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -48,6 +49,7 @@ func (f *fedRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *rsapi.Que
|
||||||
|
|
||||||
// TODO: This struct isn't generic, only works for TestFederationAPIJoinThenKeyUpdate
|
// TODO: This struct isn't generic, only works for TestFederationAPIJoinThenKeyUpdate
|
||||||
type fedClient struct {
|
type fedClient struct {
|
||||||
|
fedClientMutex sync.Mutex
|
||||||
api.FederationClient
|
api.FederationClient
|
||||||
allowJoins []*test.Room
|
allowJoins []*test.Room
|
||||||
keys map[gomatrixserverlib.ServerName]struct {
|
keys map[gomatrixserverlib.ServerName]struct {
|
||||||
|
@ -59,6 +61,8 @@ type fedClient struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) {
|
func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) {
|
||||||
|
f.fedClientMutex.Lock()
|
||||||
|
defer f.fedClientMutex.Unlock()
|
||||||
fmt.Println("GetServerKeys:", matrixServer)
|
fmt.Println("GetServerKeys:", matrixServer)
|
||||||
var keys gomatrixserverlib.ServerKeys
|
var keys gomatrixserverlib.ServerKeys
|
||||||
var keyID gomatrixserverlib.KeyID
|
var keyID gomatrixserverlib.KeyID
|
||||||
|
@ -122,6 +126,8 @@ func (f *fedClient) MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) {
|
func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) {
|
||||||
|
f.fedClientMutex.Lock()
|
||||||
|
defer f.fedClientMutex.Unlock()
|
||||||
for _, r := range f.allowJoins {
|
for _, r := range f.allowJoins {
|
||||||
if r.ID == event.RoomID() {
|
if r.ID == event.RoomID() {
|
||||||
r.InsertEvent(f.t, event.Headered(r.Version))
|
r.InsertEvent(f.t, event.Headered(r.Version))
|
||||||
|
@ -134,6 +140,8 @@ func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fedClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
|
func (f *fedClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
|
||||||
|
f.fedClientMutex.Lock()
|
||||||
|
defer f.fedClientMutex.Unlock()
|
||||||
for _, edu := range t.EDUs {
|
for _, edu := range t.EDUs {
|
||||||
if edu.Type == gomatrixserverlib.MDeviceListUpdate {
|
if edu.Type == gomatrixserverlib.MDeviceListUpdate {
|
||||||
f.sentTxn = true
|
f.sentTxn = true
|
||||||
|
@ -242,6 +250,8 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
|
||||||
|
|
||||||
testrig.MustPublishMsgs(t, jsctx, msg)
|
testrig.MustPublishMsgs(t, jsctx, msg)
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
fc.fedClientMutex.Lock()
|
||||||
|
defer fc.fedClientMutex.Unlock()
|
||||||
if !fc.sentTxn {
|
if !fc.sentTxn {
|
||||||
t.Fatalf("did not send device list update")
|
t.Fatalf("did not send device list update")
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package storage_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
@ -103,6 +104,9 @@ func TestKeyChangesUpperLimit(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var dbLock sync.Mutex
|
||||||
|
var deviceArray = []string{"AAA", "another_device"}
|
||||||
|
|
||||||
// The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user,
|
// The purpose of this test is to make sure that the storage layer is generating sequential stream IDs per user,
|
||||||
// and that they are returned correctly when querying for device keys.
|
// and that they are returned correctly when querying for device keys.
|
||||||
func TestDeviceKeysStreamIDGeneration(t *testing.T) {
|
func TestDeviceKeysStreamIDGeneration(t *testing.T) {
|
||||||
|
@ -169,8 +173,11 @@ func TestDeviceKeysStreamIDGeneration(t *testing.T) {
|
||||||
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID)
|
t.Fatalf("Expected StoreLocalDeviceKeys to set StreamID=3 (new key same device) but got %d", msgs[0].StreamID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbLock.Lock()
|
||||||
|
defer dbLock.Unlock()
|
||||||
// Querying for device keys returns the latest stream IDs
|
// Querying for device keys returns the latest stream IDs
|
||||||
msgs, err = db.DeviceKeysForUser(ctx, alice, []string{"AAA", "another_device"}, false)
|
msgs, err = db.DeviceKeysForUser(ctx, alice, deviceArray, false)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("DeviceKeysForUser returned error: %s", err)
|
t.Fatalf("DeviceKeysForUser returned error: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ main() {
|
||||||
|
|
||||||
if [ -d ../sytest ]; then
|
if [ -d ../sytest ]; then
|
||||||
local tmpdir
|
local tmpdir
|
||||||
tmpdir="$(mktemp -d --tmpdir run-systest.XXXXXXXXXX)"
|
tmpdir="$(mktemp -d -t run-systest.XXXXXXXXXX)"
|
||||||
trap "rm -r '$tmpdir'" EXIT
|
trap "rm -r '$tmpdir'" EXIT
|
||||||
|
|
||||||
if [ -z "$DISABLE_BUILDING_SYTEST" ]; then
|
if [ -z "$DISABLE_BUILDING_SYTEST" ]; then
|
||||||
|
|
|
@ -14,16 +14,16 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
natsserver "github.com/nats-io/nats-server/v2/server"
|
natsserver "github.com/nats-io/nats-server/v2/server"
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
natsclient "github.com/nats-io/nats.go"
|
natsclient "github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NATSInstance struct {
|
type NATSInstance struct {
|
||||||
*natsserver.Server
|
*natsserver.Server
|
||||||
sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
|
var natsLock sync.Mutex
|
||||||
|
|
||||||
|
func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) {
|
||||||
for _, stream := range streams { // streams are defined in streams.go
|
for _, stream := range streams { // streams are defined in streams.go
|
||||||
name := cfg.Prefixed(stream.Name)
|
name := cfg.Prefixed(stream.Name)
|
||||||
_ = js.DeleteStream(name)
|
_ = js.DeleteStream(name)
|
||||||
|
@ -31,11 +31,12 @@ func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||||
|
natsLock.Lock()
|
||||||
|
defer natsLock.Unlock()
|
||||||
// check if we need an in-process NATS Server
|
// check if we need an in-process NATS Server
|
||||||
if len(cfg.Addresses) != 0 {
|
if len(cfg.Addresses) != 0 {
|
||||||
return setupNATS(process, cfg, nil)
|
return setupNATS(process, cfg, nil)
|
||||||
}
|
}
|
||||||
s.Lock()
|
|
||||||
if s.Server == nil {
|
if s.Server == nil {
|
||||||
var err error
|
var err error
|
||||||
s.Server, err = natsserver.NewServer(&natsserver.Options{
|
s.Server, err = natsserver.NewServer(&natsserver.Options{
|
||||||
|
@ -63,7 +64,6 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
|
||||||
process.ComponentFinished()
|
process.ComponentFinished()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
s.Unlock()
|
|
||||||
if !s.ReadyForConnections(time.Second * 10) {
|
if !s.ReadyForConnections(time.Second * 10) {
|
||||||
logrus.Fatalln("NATS did not start in time")
|
logrus.Fatalln("NATS did not start in time")
|
||||||
}
|
}
|
||||||
|
@ -77,9 +77,9 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
|
||||||
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
|
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||||
if nc == nil {
|
if nc == nil {
|
||||||
var err error
|
var err error
|
||||||
opts := []nats.Option{}
|
opts := []natsclient.Option{}
|
||||||
if cfg.DisableTLSValidation {
|
if cfg.DisableTLSValidation {
|
||||||
opts = append(opts, nats.Secure(&tls.Config{
|
opts = append(opts, natsclient.Secure(&tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,10 +12,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type dummyPublisher struct {
|
type dummyPublisher struct {
|
||||||
|
lock sync.Mutex
|
||||||
count int
|
count int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
|
func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
|
||||||
|
d.lock.Lock()
|
||||||
|
defer d.lock.Unlock()
|
||||||
d.count++
|
d.count++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -125,11 +128,15 @@ func TestRequestPool_updatePresence(t *testing.T) {
|
||||||
go rp.cleanPresence(db, time.Millisecond*50)
|
go rp.cleanPresence(db, time.Millisecond*50)
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
publisher.lock.Lock()
|
||||||
beforeCount := publisher.count
|
beforeCount := publisher.count
|
||||||
|
publisher.lock.Unlock()
|
||||||
rp.updatePresence(db, tt.args.presence, tt.args.userID)
|
rp.updatePresence(db, tt.args.presence, tt.args.userID)
|
||||||
|
publisher.lock.Lock()
|
||||||
if tt.wantIncrease && publisher.count <= beforeCount {
|
if tt.wantIncrease && publisher.count <= beforeCount {
|
||||||
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
|
t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
|
||||||
}
|
}
|
||||||
|
publisher.lock.Unlock()
|
||||||
time.Sleep(tt.args.sleep)
|
time.Sleep(tt.args.sleep)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue