Add polling for req logs, store them async, fix sort order

This commit is contained in:
David Stotijn 2020-09-27 14:58:37 +02:00
parent 91947b9ffa
commit 914650df42
3 changed files with 63 additions and 29 deletions

View file

@ -28,7 +28,9 @@ function LogsOverview(): JSX.Element {
const detailReqLogId = router.query.id as string;
console.log(detailReqLogId);
const { loading, error, data } = useQuery(HTTP_REQUEST_LOGS);
const { loading, error, data } = useQuery(HTTP_REQUEST_LOGS, {
pollInterval: 1000,
});
const handleLogClick = (reqId: string) => {
router.push("/proxy/logs?id=" + reqId, undefined, {

View file

@ -8,6 +8,7 @@ import (
"net/url"
"path"
"strings"
"sync"
"time"
"github.com/cayleygraph/cayley"
@ -54,6 +55,7 @@ type HTTPHeader struct {
type Database struct {
store *cayley.Handle
schema *schema.Config
mu sync.Mutex
}
func init() {
@ -106,11 +108,14 @@ func (db *Database) Close() error {
}
func (db *Database) FindAllRequestLogs(ctx context.Context) ([]reqlog.Request, error) {
db.mu.Lock()
defer db.mu.Unlock()
var reqLogs []reqlog.Request
var reqs []HTTPRequest
path := cayley.StartPath(db.store, quad.IRI("hy:HTTPRequest")).In(quad.IRI(rdf.Type))
err := path.Iterate(ctx).EachValue(nil, func(v quad.Value) {
err := path.Iterate(ctx).EachValue(db.store, func(v quad.Value) {
var req HTTPRequest
if err := db.schema.LoadToDepth(ctx, db.store, &req, -1, v); err != nil {
log.Printf("[ERROR] Could not load sub-graph for http requests: %v", err)
@ -130,10 +135,20 @@ func (db *Database) FindAllRequestLogs(ctx context.Context) ([]reqlog.Request, e
reqLogs = append(reqLogs, reqLog)
}
// By default, all retrieved requests are ordered chronologically, oldest first.
// Reverse the order, so newest logs are first.
for i := len(reqLogs)/2 - 1; i >= 0; i-- {
opp := len(reqLogs) - 1 - i
reqLogs[i], reqLogs[opp] = reqLogs[opp], reqLogs[i]
}
return reqLogs, nil
}
func (db *Database) FindRequestLogByID(ctx context.Context, id uuid.UUID) (reqlog.Request, error) {
db.mu.Lock()
defer db.mu.Unlock()
var req HTTPRequest
err := db.schema.LoadTo(ctx, db.store, &req, iriFromUUID(id))
if schema.IsNotFound(err) {
@ -152,6 +167,9 @@ func (db *Database) FindRequestLogByID(ctx context.Context, id uuid.UUID) (reqlo
}
func (db *Database) AddRequestLog(ctx context.Context, reqLog reqlog.Request) error {
db.mu.Lock()
defer db.mu.Unlock()
httpReq := HTTPRequest{
ID: iriFromUUID(reqLog.ID),
Proto: reqLog.Request.Proto,
@ -162,18 +180,25 @@ func (db *Database) AddRequestLog(ctx context.Context, reqLog reqlog.Request) er
Timestamp: reqLog.Timestamp,
}
qw := graph.NewWriter(db.store)
defer qw.Close()
tx := cayley.NewTransaction()
qw := graph.NewTxWriter(tx, graph.Add)
_, err := db.schema.WriteAsQuads(qw, httpReq)
if err != nil {
return fmt.Errorf("cayley: could not write quads: %v", err)
}
if err := db.store.ApplyTransaction(tx); err != nil {
return fmt.Errorf("cayley: could not apply transaction: %v", err)
}
return nil
}
func (db *Database) AddResponseLog(ctx context.Context, resLog reqlog.Response) error {
db.mu.Lock()
defer db.mu.Unlock()
httpRes := HTTPResponse{
RequestID: iriFromUUID(resLog.RequestID),
Proto: resLog.Response.Proto,
@ -184,14 +209,18 @@ func (db *Database) AddResponseLog(ctx context.Context, resLog reqlog.Response)
Timestamp: resLog.Timestamp,
}
qw := graph.NewWriter(db.store)
defer qw.Close()
tx := cayley.NewTransaction()
qw := graph.NewTxWriter(tx, graph.Add)
_, err := db.schema.WriteAsQuads(qw, httpRes)
if err != nil {
return fmt.Errorf("cayley: could not write response quads: %v", err)
}
if err := db.store.ApplyTransaction(tx); err != nil {
return fmt.Errorf("cayley: could not apply transaction: %v", err)
}
return nil
}

View file

@ -60,6 +60,18 @@ func (svc *Service) addRequest(ctx context.Context, reqID uuid.UUID, req http.Re
}
func (svc *Service) addResponse(ctx context.Context, reqID uuid.UUID, res http.Response, body []byte) error {
if res.Header.Get("Content-Encoding") == "gzip" {
gzipReader, err := gzip.NewReader(bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("reqlog: could not create gzip reader: %v", err)
}
defer gzipReader.Close()
body, err = ioutil.ReadAll(gzipReader)
if err != nil {
return fmt.Errorf("reqlog: could not read gzipped response body: %v", err)
}
}
resLog := Response{
RequestID: reqID,
Response: res,
@ -93,10 +105,11 @@ func (svc *Service) RequestModifier(next proxy.RequestModifyFunc) proxy.RequestM
return
}
err := svc.addRequest(req.Context(), reqID, *clone, body)
if err != nil {
log.Printf("[ERROR] Could not store request log: %v", err)
}
go func() {
if err := svc.addRequest(context.Background(), reqID, *clone, body); err != nil {
log.Printf("[ERROR] Could not store request log: %v", err)
}
}()
}
}
@ -106,6 +119,11 @@ func (svc *Service) ResponseModifier(next proxy.ResponseModifyFunc) proxy.Respon
return err
}
reqID, _ := res.Request.Context().Value(proxy.ReqIDKey).(uuid.UUID)
if reqID == uuid.Nil {
return errors.New("reqlog: request is missing ID")
}
clone := *res
// TODO: Use io.LimitReader.
@ -115,26 +133,11 @@ func (svc *Service) ResponseModifier(next proxy.ResponseModifyFunc) proxy.Respon
}
res.Body = ioutil.NopCloser(bytes.NewBuffer(body))
if res.Header.Get("Content-Encoding") == "gzip" {
gzipReader, err := gzip.NewReader(bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("reqlog: could not create gzip reader: %v", err)
go func() {
if err := svc.addResponse(res.Request.Context(), reqID, clone, body); err != nil {
log.Printf("[ERROR] Could not store response log: %v", err)
}
defer gzipReader.Close()
body, err = ioutil.ReadAll(gzipReader)
if err != nil {
return fmt.Errorf("reqlog: could not read gzipped response body: %v", err)
}
}
reqID, _ := res.Request.Context().Value(proxy.ReqIDKey).(uuid.UUID)
if reqID == uuid.Nil {
return errors.New("reqlog: request is missing ID")
}
if err := svc.addResponse(res.Request.Context(), reqID, clone, body); err != nil {
return fmt.Errorf("reqlog: could not add response: %v", err)
}
}()
return nil
}