feat: add tunnel monitor web ui
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
69
store/record.go
Normal file
69
store/record.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type TunnelRecord struct {
|
||||
ID uuid.UUID
|
||||
Time time.Time
|
||||
URL *url.URL
|
||||
Method string
|
||||
Status int
|
||||
SourceAddr string
|
||||
|
||||
RequestHeaders http.Header
|
||||
RequestBodyType string
|
||||
RequestBody []byte
|
||||
|
||||
ResponseHeaders http.Header
|
||||
ResponseBodyType string
|
||||
ResponseBody []byte
|
||||
}
|
||||
|
||||
func (tr *TunnelRecord) MarshalJSON() ([]byte, error) {
|
||||
type Alias TunnelRecord
|
||||
return json.Marshal(&struct {
|
||||
*Alias
|
||||
URL string `json:"URL"`
|
||||
Time string `json:"Time"`
|
||||
}{
|
||||
Alias: (*Alias)(tr),
|
||||
URL: tr.URL.String(),
|
||||
Time: tr.Time.Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
|
||||
func (tr *TunnelRecord) UnmarshalJSON(data []byte) error {
|
||||
type Alias TunnelRecord
|
||||
aux := &struct {
|
||||
*Alias
|
||||
URL string `json:"URL"`
|
||||
Time string `json:"Time"`
|
||||
}{
|
||||
Alias: (*Alias)(tr),
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &aux); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
parsedURL, err := url.Parse(aux.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr.URL = parsedURL
|
||||
|
||||
parsedTime, err := time.Parse(time.RFC3339, aux.Time)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr.Time = parsedTime
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"io"
|
||||
"mime"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -21,29 +20,16 @@ const (
|
||||
|
||||
var ErrRecordNotFound = errors.New("record not found")
|
||||
|
||||
type OnEntryHandler func(record *TunnelRecord)
|
||||
|
||||
type TunnelStore interface {
|
||||
Get(before time.Time, count int) (results []*TunnelRecord, more bool)
|
||||
Subscribe() <-chan *TunnelRecord
|
||||
RecordTCP()
|
||||
RecordRequest(req *http.Request)
|
||||
RecordRequest(req *http.Request, sourceAddress string)
|
||||
RecordResponse(resp *http.Response) error
|
||||
}
|
||||
|
||||
type TunnelRecord struct {
|
||||
ID uuid.UUID
|
||||
Time time.Time
|
||||
URL *url.URL
|
||||
Method string
|
||||
Status int
|
||||
|
||||
RequestHeaders http.Header
|
||||
RequestBodyType string
|
||||
RequestBody []byte
|
||||
|
||||
ResponseHeaders http.Header
|
||||
ResponseBodyType string
|
||||
ResponseBody []byte
|
||||
}
|
||||
|
||||
func NewTunnelStore(queueSize int) TunnelStore {
|
||||
if queueSize <= 0 {
|
||||
queueSize = defaultQueueSize
|
||||
@@ -57,9 +43,25 @@ func NewTunnelStore(queueSize int) TunnelStore {
|
||||
type tunnelStoreImpl struct {
|
||||
orderedRecords []*TunnelRecord
|
||||
queueSize int
|
||||
subs []chan *TunnelRecord
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (s *tunnelStoreImpl) Subscribe() <-chan *TunnelRecord {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
ch := make(chan *TunnelRecord, 100)
|
||||
|
||||
// Flush Existing & Subscribe
|
||||
for _, r := range s.orderedRecords {
|
||||
ch <- r
|
||||
}
|
||||
s.subs = append(s.subs, ch)
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
func (s *tunnelStoreImpl) Get(before time.Time, count int) ([]*TunnelRecord, bool) {
|
||||
// Find First
|
||||
start := -1
|
||||
@@ -83,7 +85,7 @@ func (s *tunnelStoreImpl) Get(before time.Time, count int) ([]*TunnelRecord, boo
|
||||
return results, more
|
||||
}
|
||||
|
||||
func (s *tunnelStoreImpl) RecordRequest(req *http.Request) {
|
||||
func (s *tunnelStoreImpl) RecordRequest(req *http.Request, sourceAddress string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -93,6 +95,7 @@ func (s *tunnelStoreImpl) RecordRequest(req *http.Request) {
|
||||
Time: time.Now(),
|
||||
URL: &url,
|
||||
Method: req.Method,
|
||||
SourceAddr: sourceAddress,
|
||||
RequestHeaders: req.Header,
|
||||
RequestBodyType: req.Header.Get("Content-Type"),
|
||||
}
|
||||
@@ -116,6 +119,7 @@ func (s *tunnelStoreImpl) RecordResponse(resp *http.Response) error {
|
||||
return ErrRecordNotFound
|
||||
}
|
||||
|
||||
rec.Status = resp.StatusCode
|
||||
rec.ResponseHeaders = resp.Header
|
||||
rec.ResponseBodyType = resp.Header.Get("Content-Type")
|
||||
|
||||
@@ -123,6 +127,8 @@ func (s *tunnelStoreImpl) RecordResponse(resp *http.Response) error {
|
||||
rec.ResponseBody = bodyData
|
||||
}
|
||||
|
||||
s.broadcast(rec)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -133,6 +139,23 @@ func (s *tunnelStoreImpl) RecordTCP() {
|
||||
// TODO
|
||||
}
|
||||
|
||||
func (s *tunnelStoreImpl) broadcast(record *TunnelRecord) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Send to Subscribers
|
||||
active := s.subs[:0]
|
||||
for _, ch := range s.subs {
|
||||
select {
|
||||
case ch <- record:
|
||||
active = append(active, ch)
|
||||
default:
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
s.subs = active
|
||||
}
|
||||
|
||||
func getRequestBody(req *http.Request) ([]byte, error) {
|
||||
if req.ContentLength == 0 || req.Body == nil || req.Body == http.NoBody {
|
||||
return nil, nil
|
||||
|
||||
Reference in New Issue
Block a user