diff --git a/cmd/tunnel.go b/cmd/tunnel.go index a9f48ab..8806fc1 100644 --- a/cmd/tunnel.go +++ b/cmd/tunnel.go @@ -2,12 +2,17 @@ package cmd import ( "context" + "os" + "os/signal" + "sync" + "syscall" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "reichard.io/conduit/config" "reichard.io/conduit/store" "reichard.io/conduit/tunnel" + "reichard.io/conduit/web" ) var tunnelCmd = &cobra.Command{ @@ -20,22 +25,39 @@ var tunnelCmd = &cobra.Command{ log.Fatal("failed to get client config:", err) } + var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Create Tunnel Store + tunnelStore := store.NewTunnelStore(100) + // Create Forwarder - tunnelForwarder, err := tunnel.NewForwarder(cfg.TunnelTarget, store.NewTunnelStore(100)) + tunnelForwarder, err := tunnel.NewForwarder(cfg.TunnelTarget, tunnelStore) if err != nil { log.Fatal("failed to create tunnel forwarder:", err) } - go tunnelForwarder.Start(ctx) + wg.Go(func() { tunnelForwarder.Start(ctx) }) // Create Tunnel tunnel, err := tunnel.NewClientTunnel(cfg, tunnelForwarder) if err != nil { log.Fatal("failed to create tunnel:", err) } - tunnel.Start(ctx) + wg.Go(func() { tunnel.Start(ctx) }) + + // Create Server + webServer := web.NewWebServer(tunnelStore) + wg.Go(func() { webServer.Start(ctx) }) + + // Wait Interrupt + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + <-sigChan + + log.Println("Shutting Down...") + cancel() + wg.Wait() }, } diff --git a/flake.lock b/flake.lock index fa2eda9..9aa7f11 100644 --- a/flake.lock +++ b/flake.lock @@ -20,16 +20,16 @@ }, "nixpkgs": { "locked": { - "lastModified": 1758216857, - "narHash": "sha256-h1BW2y7CY4LI9w61R02wPaOYfmYo82FyRqHIwukQ6SY=", + "lastModified": 1760038930, + "narHash": "sha256-Oncbh0UmHjSlxO7ErQDM3KM0A5/Znfofj2BSzlHLeVw=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "d2ed99647a4b195f0bcc440f76edfa10aeb3b743", + "rev": "0b4defa2584313f3b781240b29d61f6f9f7e0df3", "type": "github" }, "original": { "owner": "NixOS", - "ref": "nixos-25.05", + "ref": "nixos-unstable", "repo": "nixpkgs", "type": "github" } diff --git a/flake.nix b/flake.nix index 38b37b8..7ce2df7 100644 --- a/flake.nix +++ b/flake.nix @@ -2,7 +2,7 @@ description = "Development Environment"; inputs = { - nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05"; + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; flake-utils.url = "github:numtide/flake-utils"; }; diff --git a/go.mod b/go.mod index ef9f828..b5068b1 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module reichard.io/conduit -go 1.24.4 +go 1.25.1 require ( github.com/google/uuid v1.6.0 // indirect @@ -10,4 +10,5 @@ require ( github.com/spf13/cobra v1.10.1 // indirect github.com/spf13/pflag v1.0.9 // indirect golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + maragu.dev/gomponents v1.2.0 // indirect ) diff --git a/go.sum b/go.sum index f72deec..b2a20fa 100644 --- a/go.sum +++ b/go.sum @@ -22,3 +22,5 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +maragu.dev/gomponents v1.2.0 h1:H7/N5htz1GCnhu0HB1GasluWeU2rJZOYztVEyN61iTc= +maragu.dev/gomponents v1.2.0/go.mod h1:oEDahza2gZoXDoDHhw8jBNgH+3UR5ni7Ur648HORydM= diff --git a/server/server.go b/server/server.go index 964c925..b1ef1ce 100644 --- a/server/server.go +++ b/server/server.go @@ -169,7 +169,7 @@ func (s *Server) handleRawConnection(conn net.Conn) { // Create Stream reconstructedConn := newReconstructedConn(conn, &capturedData) streamID := fmt.Sprintf("stream_%d", time.Now().UnixNano()) - tunnelStream := tunnel.NewStream(reconstructedConn, r.RemoteAddr) + tunnelStream := tunnel.NewStream(reconstructedConn, r.RemoteAddr, conduitTunnel.Source()) // Add Stream if err := conduitTunnel.AddStream(tunnelStream, streamID); err != nil { diff --git a/store/record.go b/store/record.go new file mode 100644 index 0000000..96bfa35 --- /dev/null +++ b/store/record.go @@ -0,0 +1,69 @@ +package store + +import ( + "encoding/json" + "net/http" + "net/url" + "time" + + "github.com/google/uuid" +) + +type TunnelRecord struct { + ID uuid.UUID + Time time.Time + URL *url.URL + Method string + Status int + SourceAddr string + + RequestHeaders http.Header + RequestBodyType string + RequestBody []byte + + ResponseHeaders http.Header + ResponseBodyType string + ResponseBody []byte +} + +func (tr *TunnelRecord) MarshalJSON() ([]byte, error) { + type Alias TunnelRecord + return json.Marshal(&struct { + *Alias + URL string `json:"URL"` + Time string `json:"Time"` + }{ + Alias: (*Alias)(tr), + URL: tr.URL.String(), + Time: tr.Time.Format(time.RFC3339), + }) +} + +func (tr *TunnelRecord) UnmarshalJSON(data []byte) error { + type Alias TunnelRecord + aux := &struct { + *Alias + URL string `json:"URL"` + Time string `json:"Time"` + }{ + Alias: (*Alias)(tr), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + parsedURL, err := url.Parse(aux.URL) + if err != nil { + return err + } + tr.URL = parsedURL + + parsedTime, err := time.Parse(time.RFC3339, aux.Time) + if err != nil { + return err + } + tr.Time = parsedTime + + return nil +} diff --git a/store/store.go b/store/store.go index 4fbb840..452a77e 100644 --- a/store/store.go +++ b/store/store.go @@ -6,7 +6,6 @@ import ( "io" "mime" "net/http" - "net/url" "strings" "sync" "time" @@ -21,29 +20,16 @@ const ( var ErrRecordNotFound = errors.New("record not found") +type OnEntryHandler func(record *TunnelRecord) + type TunnelStore interface { Get(before time.Time, count int) (results []*TunnelRecord, more bool) + Subscribe() <-chan *TunnelRecord RecordTCP() - RecordRequest(req *http.Request) + RecordRequest(req *http.Request, sourceAddress string) RecordResponse(resp *http.Response) error } -type TunnelRecord struct { - ID uuid.UUID - Time time.Time - URL *url.URL - Method string - Status int - - RequestHeaders http.Header - RequestBodyType string - RequestBody []byte - - ResponseHeaders http.Header - ResponseBodyType string - ResponseBody []byte -} - func NewTunnelStore(queueSize int) TunnelStore { if queueSize <= 0 { queueSize = defaultQueueSize @@ -57,9 +43,25 @@ func NewTunnelStore(queueSize int) TunnelStore { type tunnelStoreImpl struct { orderedRecords []*TunnelRecord queueSize int + subs []chan *TunnelRecord mu sync.Mutex } +func (s *tunnelStoreImpl) Subscribe() <-chan *TunnelRecord { + s.mu.Lock() + defer s.mu.Unlock() + + ch := make(chan *TunnelRecord, 100) + + // Flush Existing & Subscribe + for _, r := range s.orderedRecords { + ch <- r + } + s.subs = append(s.subs, ch) + + return ch +} + func (s *tunnelStoreImpl) Get(before time.Time, count int) ([]*TunnelRecord, bool) { // Find First start := -1 @@ -83,7 +85,7 @@ func (s *tunnelStoreImpl) Get(before time.Time, count int) ([]*TunnelRecord, boo return results, more } -func (s *tunnelStoreImpl) RecordRequest(req *http.Request) { +func (s *tunnelStoreImpl) RecordRequest(req *http.Request, sourceAddress string) { s.mu.Lock() defer s.mu.Unlock() @@ -93,6 +95,7 @@ func (s *tunnelStoreImpl) RecordRequest(req *http.Request) { Time: time.Now(), URL: &url, Method: req.Method, + SourceAddr: sourceAddress, RequestHeaders: req.Header, RequestBodyType: req.Header.Get("Content-Type"), } @@ -116,6 +119,7 @@ func (s *tunnelStoreImpl) RecordResponse(resp *http.Response) error { return ErrRecordNotFound } + rec.Status = resp.StatusCode rec.ResponseHeaders = resp.Header rec.ResponseBodyType = resp.Header.Get("Content-Type") @@ -123,6 +127,8 @@ func (s *tunnelStoreImpl) RecordResponse(resp *http.Response) error { rec.ResponseBody = bodyData } + s.broadcast(rec) + return nil } @@ -133,6 +139,23 @@ func (s *tunnelStoreImpl) RecordTCP() { // TODO } +func (s *tunnelStoreImpl) broadcast(record *TunnelRecord) { + s.mu.Lock() + defer s.mu.Unlock() + + // Send to Subscribers + active := s.subs[:0] + for _, ch := range s.subs { + select { + case ch <- record: + active = append(active, ch) + default: + close(ch) + } + } + s.subs = active +} + func getRequestBody(req *http.Request) ([]byte, error) { if req.ContentLength == 0 || req.Body == nil || req.Body == http.NoBody { return nil, nil diff --git a/tunnel/forwarder.go b/tunnel/forwarder.go index 92fa763..a8e5f01 100644 --- a/tunnel/forwarder.go +++ b/tunnel/forwarder.go @@ -16,7 +16,7 @@ const ( type Forwarder interface { Type() ForwarderType - Initialize() (Stream, error) + Initialize(sourceAddress string) (Stream, error) Start(context.Context) error } diff --git a/tunnel/http_forwarder.go b/tunnel/http_forwarder.go index f87fc43..5f132d6 100644 --- a/tunnel/http_forwarder.go +++ b/tunnel/http_forwarder.go @@ -33,12 +33,31 @@ func (c *httpConnBuilder) Type() ForwarderType { func (c *httpConnBuilder) Start(ctx context.Context) error { // Create Reverse Proxy Server server := &http.Server{ + ConnContext: func(ctx context.Context, c net.Conn) context.Context { + if wsConn, ok := c.(*wsConn); ok { + return context.WithValue(ctx, "sourceAddr", wsConn.sourceAddress) + } + return ctx + }, Handler: &httputil.ReverseProxy{ Director: func(req *http.Request) { + // Rewrite Request URL req.Host = c.targetURL.Host req.URL.Host = c.targetURL.Host req.URL.Scheme = c.targetURL.Scheme - c.tunnelStore.RecordRequest(req) + + // Rewrite Referer + if referer := req.Header.Get("Referer"); referer != "" { + if refURL, err := url.Parse(referer); err == nil { + refURL.Host = c.targetURL.Host + refURL.Scheme = c.targetURL.Scheme + req.Header.Set("Referer", refURL.String()) + } + } + + // Extract Source Address & Record Request + sourceAddress, _ := req.Context().Value("sourceAddr").(string) + c.tunnelStore.RecordRequest(req, sourceAddress) }, ModifyResponse: c.tunnelStore.RecordResponse, ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { @@ -61,16 +80,16 @@ func (c *httpConnBuilder) Start(ctx context.Context) error { return nil } -func (c *httpConnBuilder) Initialize() (Stream, error) { +func (c *httpConnBuilder) Initialize(sourceAddress string) (Stream, error) { clientConn, serverConn := net.Pipe() - if err := c.multiConnListener.addConn(serverConn); err != nil { + if err := c.multiConnListener.addConn(&wsConn{serverConn, sourceAddress}); err != nil { _ = clientConn.Close() _ = serverConn.Close() return nil, err } - return &streamImpl{clientConn, c.targetURL.String()}, nil + return &streamImpl{clientConn, sourceAddress, c.targetURL.String()}, nil } type multiConnListener struct { @@ -130,3 +149,8 @@ func (l *multiConnListener) addConn(conn net.Conn) error { return fmt.Errorf("connection queue full") } } + +type wsConn struct { + net.Conn + sourceAddress string +} diff --git a/tunnel/stream.go b/tunnel/stream.go index affb4c2..3ec6633 100644 --- a/tunnel/stream.go +++ b/tunnel/stream.go @@ -10,17 +10,23 @@ var _ Stream = (*streamImpl)(nil) type Stream interface { io.ReadWriteCloser Source() string + Target() string } -func NewStream(conn net.Conn, source string) Stream { - return &streamImpl{conn, source} +func NewStream(conn net.Conn, source, target string) Stream { + return &streamImpl{conn, source, target} } type streamImpl struct { net.Conn source string + target string } func (s *streamImpl) Source() string { return s.source } + +func (s *streamImpl) Target() string { + return s.target +} diff --git a/tunnel/tcp_forwarder.go b/tunnel/tcp_forwarder.go index 32153ab..09e2dee 100644 --- a/tunnel/tcp_forwarder.go +++ b/tunnel/tcp_forwarder.go @@ -23,13 +23,13 @@ func (l *tcpConnBuilder) Type() ForwarderType { return ForwarderTCP } -func (l *tcpConnBuilder) Initialize() (Stream, error) { +func (l *tcpConnBuilder) Initialize(sourceAddress string) (Stream, error) { conn, err := net.Dial("tcp", l.target) if err != nil { return nil, err } - return &streamImpl{conn, l.target}, nil + return &streamImpl{conn, sourceAddress, l.target}, nil } func (l *tcpConnBuilder) Start(ctx context.Context) error { diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 6fe48e3..59a28df 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -94,7 +94,7 @@ func (t *Tunnel) Start(ctx context.Context) { } // Get Stream - stream, err := t.getStream(msg.StreamID) + stream, err := t.getStream(msg.StreamID, msg.SourceAddr) if err != nil { if msg.Type != types.MessageTypeClose { log.WithError(err).Errorf("failed to get stream %s", msg.StreamID) @@ -179,7 +179,7 @@ func (t *Tunnel) closeStream(stream Stream, streamID string) error { return stream.Close() } -func (t *Tunnel) getStream(streamID string) (Stream, error) { +func (t *Tunnel) getStream(streamID, sourceAddress string) (Stream, error) { // Check Existing Stream if stream, found := t.streams.Get(streamID); found { return stream, nil @@ -191,7 +191,7 @@ func (t *Tunnel) getStream(streamID string) (Stream, error) { } // Initialize Forwarder & Add Stream - stream, err := t.forwarder.Initialize() + stream, err := t.forwarder.Initialize(sourceAddress) if err != nil { return nil, err } diff --git a/web/pages/network.go b/web/pages/network.go new file mode 100644 index 0000000..be5c6ea --- /dev/null +++ b/web/pages/network.go @@ -0,0 +1,231 @@ +package pages + +import ( + g "maragu.dev/gomponents" + h "maragu.dev/gomponents/html" + + _ "embed" +) + +//go:embed networkScript.js +var alpineScript string + +func NetworkPage() g.Node { + return h.Doctype( + h.HTML( + h.Head( + h.Script(g.Raw(alpineScript)), + h.Script(g.Attr("src", "//cdn.tailwindcss.com")), + h.Script(g.Attr("src", "//cdn.jsdelivr.net/npm/alpinejs@3.x.x/dist/cdn.min.js")), + ), + h.Body( + h.Div(h.Class("bg-gray-900 text-gray-100"), + networkMonitor(), + ), + ), + ), + ) +} + +func networkMonitor() g.Node { + return h.Div( + g.Attr("x-data", "networkMonitor()"), + h.Class("h-dvh flex flex-col"), + + // Header + h.Div( + h.Class("bg-gray-800 border-b border-gray-700 px-4 py-2 flex items-center gap-4"), + h.H1(h.Class("text-lg font-semibold"), g.Text("Network")), + h.Button( + g.Attr("@click", "clear()"), + h.Class("px-3 py-1 bg-gray-700 hover:bg-gray-600 rounded text-sm"), + g.Text("Clear"), + ), + h.Div( + h.Class("ml-auto text-sm text-gray-400"), + h.Span(g.Attr("x-text", "requests.length")), + g.Text(" requests"), + ), + ), + + // Table + h.Div( + h.Class("flex-1 overflow-auto"), + h.Table( + h.Class("w-full text-sm"), + networkTableHeader(), + networkTableBody(), + ), + ), + + // Details Panel + networkDetailsPanel(), + ) +} + +func networkTableHeader() g.Node { + return h.THead( + h.Class("bg-gray-800 sticky top-0 border-b border-gray-700"), + h.Tr( + h.Class("text-left"), + h.Th(h.Class("px-4 py-2 font-medium"), g.Text("Name")), + h.Th(h.Class("px-4 py-2 font-medium w-20"), g.Text("Method")), + h.Th(h.Class("px-4 py-2 font-medium w-20"), g.Text("Status")), + h.Th(h.Class("px-4 py-2 font-medium w-32"), g.Text("Type")), + h.Th(h.Class("px-4 py-2 font-medium w-32"), g.Text("Time")), + ), + ) +} + +func networkTableBody() g.Node { + return h.TBody( + h.Template( + g.Attr("x-for", "req in requests"), + g.Attr(":key", "req.ID"), + h.Tr( + g.Attr("@click", "selected = req"), + g.Attr(":class", "selected?.ID === req.ID ? 'bg-blue-900' : 'hover:bg-gray-800'"), + h.Class("border-b border-gray-800 cursor-pointer"), + h.Td( + h.Class("px-4 py-2 truncate max-w-md"), + g.Attr("x-text", "req.URL?.Path || req.URL"), + ), + h.Td(h.Class("px-4 py-2"), g.Attr("x-text", "req.Method")), + h.Td( + h.Class("px-4 py-2"), + h.Span( + g.Attr(":class", "statusColor(req.Status)"), + g.Attr("x-text", "req.Status || '-'"), + ), + ), + h.Td( + h.Class("px-4 py-2 text-gray-400"), + g.Attr("x-text", "req.ResponseBodyType || '-'"), + ), + h.Td( + h.Class("px-4 py-2 text-gray-400"), + g.Attr("x-text", "formatTime(req.Time)"), + ), + ), + ), + ) +} +func networkDetailsPanel() g.Node { + return h.Div( + g.Attr("x-show", "selected"), + g.Attr("x-data", "{ activeTab: 'general' }"), + h.Class("fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center z-50"), + g.Attr("@click.self", "selected = null"), + h.Div( + h.Class("bg-gray-800 rounded-lg shadow-xl w-3/4 h-3/4 flex flex-col"), + g.Attr("@click.stop", ""), + // Header + h.Div( + h.Class("flex items-center justify-between p-4 border-b border-gray-700"), + h.H2( + h.Class("text-lg font-semibold"), + g.Attr("x-text", "selected?.URL"), + ), + h.Button( + g.Attr("@click", "selected = null"), + h.Class("text-gray-400 hover:text-gray-200"), + g.Text("X"), + ), + ), + // Tabs + h.Div( + h.Class("flex border-b border-gray-700"), + tab("general", "General"), + tab("request", "Request"), + tab("response", "Response"), + ), + // Content + h.Div( + h.Class("flex-1 overflow-auto p-4"), + generalTabContent(), + requestTabContent(), + responseTabContent(), + ), + ), + ) +} + +func tab(name, label string) g.Node { + return h.Button( + g.Attr("@click", "activeTab = '"+name+"'"), + g.Attr(":class", "activeTab === '"+name+"' ? 'border-b-2 border-blue-500 text-blue-500' : 'text-gray-400 hover:text-gray-200'"), + h.Class("px-4 py-2 font-medium"), + g.Text(label), + ) +} + +func generalTabContent() g.Node { + return h.Div( + g.Attr("x-show", "activeTab === 'general'"), + h.Class("space-y-4"), + h.H3(h.Class("font-medium"), g.Text("Details")), + h.Div( + h.Class("text-sm space-y-1 text-gray-300"), + detailRow("URL:", "selected?.URL"), + detailRow("Source:", "selected?.SourceAddr"), + detailRow("Method:", "selected?.Method"), + detailRow("Status:", "selected?.Status"), + ), + ) +} + +func requestTabContent() g.Node { + return h.Div( + g.Attr("x-show", "activeTab === 'request'"), + h.Class("space-y-4"), + h.H3(h.Class("font-medium"), g.Text("Headers")), + h.Div( + h.Class("text-sm space-y-1 text-gray-300 font-mono"), + h.Template( + g.Attr("x-for", "(values, key) in selected?.RequestHeaders"), + h.Div( + h.Span(h.Class("text-gray-500"), g.Attr("x-text", "key + ':'")), + g.Text(" "), + h.Span(g.Attr("x-text", "values.join(', ')")), + ), + ), + ), + h.H3(h.Class("font-medium"), g.Text("Body")), + h.Pre( + h.Class("text-sm text-gray-300 font-mono bg-gray-900 p-3 rounded overflow-auto max-h-96"), + h.Code(g.Attr("x-text", "formatData(selected?.RequestBody)")), + ), + ) +} + +func responseTabContent() g.Node { + return h.Div( + g.Attr("x-show", "activeTab === 'response'"), + h.Class("space-y-4"), + h.H3(h.Class("font-medium"), g.Text("Headers")), + h.Div( + h.Class("text-sm space-y-1 text-gray-300 font-mono mb-4"), + h.Template( + g.Attr("x-for", "(values, key) in selected?.ResponseHeaders"), + h.Div( + h.Span(h.Class("text-gray-500"), g.Attr("x-text", "key + ':'")), + g.Text(" "), + h.Span(g.Attr("x-text", "values.join(', ')")), + ), + ), + ), + h.H3(h.Class("font-medium"), g.Text("Body")), + h.Pre( + h.Class("text-sm text-gray-300 font-mono bg-gray-900 p-3 rounded overflow-auto max-h-96"), + h.Code(g.Attr("x-text", "formatData(selected?.ResponseBody)")), + ), + ) +} + +func detailRow(label, value string) g.Node { + return h.Div( + h.Span(h.Class("text-gray-500"), g.Text(label)), + g.Text(" "), + h.Span(g.Attr("x-text", value)), + ) +} diff --git a/web/pages/networkScript.js b/web/pages/networkScript.js new file mode 100644 index 0000000..dda9162 --- /dev/null +++ b/web/pages/networkScript.js @@ -0,0 +1,47 @@ +function networkMonitor() { + return { + requests: [], + selected: null, + + init() { + const es = new EventSource("/stream"); + es.onmessage = (e) => { + const record = JSON.parse(e.data); + const foundIdx = this.requests.findIndex((r) => r.ID === record.ID); + if (foundIdx >= 0) { + this.requests[foundIdx] = record; + } else { + this.requests.unshift(record); + } + }; + }, + + clear() { + this.requests = []; + this.selected = null; + }, + + statusColor(status) { + if (!status) return "text-gray-400"; + if (status < 300) return "text-green-400"; + if (status < 400) return "text-blue-400"; + if (status < 500) return "text-yellow-400"; + return "text-red-400"; + }, + + formatTime(time) { + return new Date(time).toLocaleTimeString(); + }, + }; +} + +function formatData(base64Data) { + if (!base64Data) return ""; + try { + const decoded = atob(base64Data); + const parsed = JSON.parse(decoded); + return JSON.stringify(parsed, null, 2); + } catch { + return atob(base64Data); + } +} diff --git a/web/web.go b/web/web.go new file mode 100644 index 0000000..244d269 --- /dev/null +++ b/web/web.go @@ -0,0 +1,90 @@ +package web + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + log "github.com/sirupsen/logrus" + "reichard.io/conduit/store" + "reichard.io/conduit/web/pages" +) + +type WebServer struct { + store store.TunnelStore + server *http.Server +} + +func NewWebServer(store store.TunnelStore) *WebServer { + return &WebServer{store: store} +} + +func (s *WebServer) Start(ctx context.Context) error { + log.Info("started tunnel monitor at :8181") + defer log.Info("stopped tunnel monitor") + + rootMux := http.NewServeMux() + rootMux.HandleFunc("/", s.handleRoot) + rootMux.HandleFunc("/stream", s.handleStream) + + s.server = &http.Server{ + Addr: ":8181", + Handler: rootMux, + } + + // Context & Cleanup + go func() { + <-ctx.Done() + s.server.Shutdown(ctx) + }() + + // Start Tunnel Monitor + if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return err + } + return nil +} + +func (s *WebServer) Stop(ctx context.Context) error { + if s.server == nil { + return nil + } + return s.server.Shutdown(ctx) +} + +func (s *WebServer) handleStream(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + // Flusher Interface Upgrade + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + return + } + + // Stream Data + ch := s.store.Subscribe() + done := r.Context().Done() + + for { + select { + case record, ok := <-ch: + if !ok { + return + } + data, _ := json.Marshal(record) + _, _ = fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + case <-done: + return + } + } +} + +func (s *WebServer) handleRoot(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + _ = pages.NetworkPage().Render(w) +}