feat: add tunnel monitor web ui
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
@@ -16,7 +16,7 @@ const (
|
||||
|
||||
type Forwarder interface {
|
||||
Type() ForwarderType
|
||||
Initialize() (Stream, error)
|
||||
Initialize(sourceAddress string) (Stream, error)
|
||||
Start(context.Context) error
|
||||
}
|
||||
|
||||
|
||||
@@ -33,12 +33,31 @@ func (c *httpConnBuilder) Type() ForwarderType {
|
||||
func (c *httpConnBuilder) Start(ctx context.Context) error {
|
||||
// Create Reverse Proxy Server
|
||||
server := &http.Server{
|
||||
ConnContext: func(ctx context.Context, c net.Conn) context.Context {
|
||||
if wsConn, ok := c.(*wsConn); ok {
|
||||
return context.WithValue(ctx, "sourceAddr", wsConn.sourceAddress)
|
||||
}
|
||||
return ctx
|
||||
},
|
||||
Handler: &httputil.ReverseProxy{
|
||||
Director: func(req *http.Request) {
|
||||
// Rewrite Request URL
|
||||
req.Host = c.targetURL.Host
|
||||
req.URL.Host = c.targetURL.Host
|
||||
req.URL.Scheme = c.targetURL.Scheme
|
||||
c.tunnelStore.RecordRequest(req)
|
||||
|
||||
// Rewrite Referer
|
||||
if referer := req.Header.Get("Referer"); referer != "" {
|
||||
if refURL, err := url.Parse(referer); err == nil {
|
||||
refURL.Host = c.targetURL.Host
|
||||
refURL.Scheme = c.targetURL.Scheme
|
||||
req.Header.Set("Referer", refURL.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Extract Source Address & Record Request
|
||||
sourceAddress, _ := req.Context().Value("sourceAddr").(string)
|
||||
c.tunnelStore.RecordRequest(req, sourceAddress)
|
||||
},
|
||||
ModifyResponse: c.tunnelStore.RecordResponse,
|
||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||
@@ -61,16 +80,16 @@ func (c *httpConnBuilder) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *httpConnBuilder) Initialize() (Stream, error) {
|
||||
func (c *httpConnBuilder) Initialize(sourceAddress string) (Stream, error) {
|
||||
clientConn, serverConn := net.Pipe()
|
||||
|
||||
if err := c.multiConnListener.addConn(serverConn); err != nil {
|
||||
if err := c.multiConnListener.addConn(&wsConn{serverConn, sourceAddress}); err != nil {
|
||||
_ = clientConn.Close()
|
||||
_ = serverConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &streamImpl{clientConn, c.targetURL.String()}, nil
|
||||
return &streamImpl{clientConn, sourceAddress, c.targetURL.String()}, nil
|
||||
}
|
||||
|
||||
type multiConnListener struct {
|
||||
@@ -130,3 +149,8 @@ func (l *multiConnListener) addConn(conn net.Conn) error {
|
||||
return fmt.Errorf("connection queue full")
|
||||
}
|
||||
}
|
||||
|
||||
type wsConn struct {
|
||||
net.Conn
|
||||
sourceAddress string
|
||||
}
|
||||
|
||||
@@ -10,17 +10,23 @@ var _ Stream = (*streamImpl)(nil)
|
||||
type Stream interface {
|
||||
io.ReadWriteCloser
|
||||
Source() string
|
||||
Target() string
|
||||
}
|
||||
|
||||
func NewStream(conn net.Conn, source string) Stream {
|
||||
return &streamImpl{conn, source}
|
||||
func NewStream(conn net.Conn, source, target string) Stream {
|
||||
return &streamImpl{conn, source, target}
|
||||
}
|
||||
|
||||
type streamImpl struct {
|
||||
net.Conn
|
||||
source string
|
||||
target string
|
||||
}
|
||||
|
||||
func (s *streamImpl) Source() string {
|
||||
return s.source
|
||||
}
|
||||
|
||||
func (s *streamImpl) Target() string {
|
||||
return s.target
|
||||
}
|
||||
|
||||
@@ -23,13 +23,13 @@ func (l *tcpConnBuilder) Type() ForwarderType {
|
||||
return ForwarderTCP
|
||||
}
|
||||
|
||||
func (l *tcpConnBuilder) Initialize() (Stream, error) {
|
||||
func (l *tcpConnBuilder) Initialize(sourceAddress string) (Stream, error) {
|
||||
conn, err := net.Dial("tcp", l.target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &streamImpl{conn, l.target}, nil
|
||||
return &streamImpl{conn, sourceAddress, l.target}, nil
|
||||
}
|
||||
|
||||
func (l *tcpConnBuilder) Start(ctx context.Context) error {
|
||||
|
||||
@@ -94,7 +94,7 @@ func (t *Tunnel) Start(ctx context.Context) {
|
||||
}
|
||||
|
||||
// Get Stream
|
||||
stream, err := t.getStream(msg.StreamID)
|
||||
stream, err := t.getStream(msg.StreamID, msg.SourceAddr)
|
||||
if err != nil {
|
||||
if msg.Type != types.MessageTypeClose {
|
||||
log.WithError(err).Errorf("failed to get stream %s", msg.StreamID)
|
||||
@@ -179,7 +179,7 @@ func (t *Tunnel) closeStream(stream Stream, streamID string) error {
|
||||
return stream.Close()
|
||||
}
|
||||
|
||||
func (t *Tunnel) getStream(streamID string) (Stream, error) {
|
||||
func (t *Tunnel) getStream(streamID, sourceAddress string) (Stream, error) {
|
||||
// Check Existing Stream
|
||||
if stream, found := t.streams.Get(streamID); found {
|
||||
return stream, nil
|
||||
@@ -191,7 +191,7 @@ func (t *Tunnel) getStream(streamID string) (Stream, error) {
|
||||
}
|
||||
|
||||
// Initialize Forwarder & Add Stream
|
||||
stream, err := t.forwarder.Initialize()
|
||||
stream, err := t.forwarder.Initialize(sourceAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user