refactor(server): replace rawHTTPResponseWriter with stdlib http.Server
- Rewrite server to use net/http.Server with ServeHTTP handler instead of raw TCP listener with hand-written HTTP responses. Control plane errors now get proper Content-Type, Content-Length, and chunked encoding via http.Error(). Tunnel traffic hijacks the connection and re-serializes the request for forwarding. - Simplify reconstructedConn to accept variative io.Readers - Delete raw_http_response_writer.go (no longer needed) - Fix TCP forwarder: bare host:port (e.g. "127.0.0.1:5432") now works correctly instead of failing on url.Parse. Only HTTP/HTTPS schemes go through URL parsing; everything else is treated as raw TCP. - Add 8 new e2e tests: HTTP response quality, 1MB response body, 512KB request body, TCP echo, TCP large payload, concurrent single-tunnel, concurrent multi-tunnel (16 tests total, all passing)
This commit is contained in:
195
server/server.go
195
server/server.go
@@ -1,14 +1,11 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
@@ -62,37 +59,104 @@ func NewServer(ctx context.Context, cfg *config.ServerConfig) (*Server, error) {
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
// Raw TCP Listener - This is necessary so we can conditionally either relay
|
||||
// the raw TCP connection, or handle conduit control server API requests.
|
||||
listener, err := net.Listen("tcp", s.cfg.BindAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
// HTTP Server - Uses stdlib http.Server for proper HTTP response handling
|
||||
// including Content-Length, chunked encoding, and keep-alive semantics.
|
||||
httpServer := &http.Server{
|
||||
Addr: s.cfg.BindAddress,
|
||||
Handler: s,
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
// Context Cancellation - Close the listener when the context is cancelled
|
||||
// so that Accept() unblocks and the loop exits cleanly.
|
||||
// Context Cancellation - Gracefully shut down when the context is cancelled.
|
||||
go func() {
|
||||
<-s.ctx.Done()
|
||||
listener.Close()
|
||||
log.Info("conduit server shutting down")
|
||||
httpServer.Close()
|
||||
}()
|
||||
|
||||
// Start Listening
|
||||
// Start Server
|
||||
log.Infof("conduit server listening on %s", s.cfg.BindAddress)
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
// Expected Error on Shutdown
|
||||
if s.ctx.Err() != nil {
|
||||
log.Info("conduit server shutting down")
|
||||
return nil
|
||||
}
|
||||
log.WithError(err).Error("error accepting connection")
|
||||
continue
|
||||
}
|
||||
|
||||
go s.handleRawConnection(conn)
|
||||
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Get True Host
|
||||
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
|
||||
r.RemoteAddr = xff
|
||||
}
|
||||
|
||||
// Validate Host
|
||||
if !strings.Contains(r.Host, s.host) {
|
||||
http.Error(w, fmt.Sprintf("unknown host: %s", r.Host), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Extract Subdomain
|
||||
tunnelName := strings.TrimSuffix(strings.Replace(r.Host, s.host, "", 1), ".")
|
||||
if strings.Count(tunnelName, ".") != 0 {
|
||||
http.Error(w, fmt.Sprintf("cannot tunnel nested subdomains: %s", r.Host), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle Control Endpoints
|
||||
if tunnelName == "" {
|
||||
s.handleAsHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle Tunnel Requests
|
||||
s.handleTunnelRequest(w, r, tunnelName)
|
||||
}
|
||||
|
||||
func (s *Server) handleTunnelRequest(w http.ResponseWriter, r *http.Request, tunnelName string) {
|
||||
// Get Tunnel
|
||||
conduitTunnel, exists := s.tunnels.Get(tunnelName)
|
||||
if !exists {
|
||||
http.Error(w, fmt.Sprintf("unknown tunnel: %s", tunnelName), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// Hijack Connection - Take over the raw TCP connection from the HTTP server
|
||||
// so we can forward the full request (including body) through the tunnel.
|
||||
hj, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
http.Error(w, "hijack not supported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
conn, bufrw, err := hj.Hijack()
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("hijack failed: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Re-Serialize Request Headers - The HTTP server already consumed the request
|
||||
// from the connection. We re-serialize it so the tunnel client receives a
|
||||
// complete HTTP request to forward to the local target.
|
||||
var reqBuf bytes.Buffer
|
||||
fmt.Fprintf(&reqBuf, "%s %s %s\r\n", r.Method, r.RequestURI, r.Proto)
|
||||
fmt.Fprintf(&reqBuf, "Host: %s\r\n", r.Host)
|
||||
_ = r.Header.Write(&reqBuf)
|
||||
reqBuf.WriteString("\r\n")
|
||||
|
||||
// Reconstruct Connection - Combine re-serialized headers with any buffered
|
||||
// body data (from the hijacked reader) and the raw connection.
|
||||
reconstructedConn := newReconstructedConn(conn, &reqBuf, bufrw)
|
||||
|
||||
// Create Stream
|
||||
streamID := fmt.Sprintf("stream_%d", time.Now().UnixNano())
|
||||
tunnelStream := tunnel.NewStream(reconstructedConn, r.RemoteAddr, conduitTunnel.Source())
|
||||
|
||||
// Add Stream
|
||||
if err := conduitTunnel.AddStream(tunnelStream, streamID); err != nil {
|
||||
log.WithError(err).Error("failed to add stream")
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Start Stream
|
||||
conduitTunnel.StartStream(tunnelStream, streamID)
|
||||
}
|
||||
|
||||
func (s *Server) getInfo(w http.ResponseWriter, _ *http.Request) {
|
||||
@@ -122,79 +186,6 @@ func (s *Server) getInfo(w http.ResponseWriter, _ *http.Request) {
|
||||
_, _ = w.Write(d)
|
||||
}
|
||||
|
||||
func (s *Server) handleRawConnection(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
// Capture Consumed Data - When determining where to route the request, we
|
||||
// have to read the host headers. This requires reading from the buffer, so
|
||||
// if we later decide to tunnel the TCP connection we need to reconstruct the
|
||||
// data from the buffer.
|
||||
var capturedData bytes.Buffer
|
||||
teeReader := io.TeeReader(conn, &capturedData)
|
||||
bufReader := bufio.NewReader(teeReader)
|
||||
|
||||
// Create HTTP Request & Writer
|
||||
w := &rawHTTPResponseWriter{conn: conn}
|
||||
r, err := http.ReadRequest(bufReader)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
// Validate Host
|
||||
if !strings.Contains(r.Host, s.host) {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = fmt.Fprintf(w, "unknown host: %s", r.Host)
|
||||
return
|
||||
}
|
||||
|
||||
// Extract Subdomain
|
||||
tunnelName := strings.TrimSuffix(strings.Replace(r.Host, s.host, "", 1), ".")
|
||||
if strings.Count(tunnelName, ".") != 0 {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = fmt.Fprintf(w, "cannot tunnel nested subdomains: %s", r.Host)
|
||||
return
|
||||
}
|
||||
|
||||
// Get True Host
|
||||
remoteHost := conn.RemoteAddr().String()
|
||||
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
|
||||
remoteHost = xff
|
||||
}
|
||||
r.RemoteAddr = remoteHost
|
||||
|
||||
// Handle Control Endpoints
|
||||
if tunnelName == "" {
|
||||
s.handleAsHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Handle Tunnels
|
||||
conduitTunnel, exists := s.tunnels.Get(tunnelName)
|
||||
if !exists {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
_, _ = fmt.Fprintf(w, "unknown tunnel: %s", tunnelName)
|
||||
return
|
||||
}
|
||||
|
||||
// Create Stream
|
||||
reconstructedConn := newReconstructedConn(conn, &capturedData)
|
||||
streamID := fmt.Sprintf("stream_%d", time.Now().UnixNano())
|
||||
tunnelStream := tunnel.NewStream(reconstructedConn, r.RemoteAddr, conduitTunnel.Source())
|
||||
|
||||
// Add Stream
|
||||
if err := conduitTunnel.AddStream(tunnelStream, streamID); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
_, _ = fmt.Fprintf(w, "failed to add stream: %v", err)
|
||||
log.WithError(err).Error("failed to add stream")
|
||||
return
|
||||
}
|
||||
|
||||
// Start Stream
|
||||
conduitTunnel.StartStream(tunnelStream, streamID)
|
||||
}
|
||||
|
||||
func (s *Server) handleAsHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Authorize Control Endpoints
|
||||
apiKey := r.URL.Query().Get("apiKey")
|
||||
@@ -219,15 +210,13 @@ func (s *Server) createTunnel(w http.ResponseWriter, r *http.Request) {
|
||||
// Get Tunnel Name
|
||||
tunnelName := r.URL.Query().Get("tunnelName")
|
||||
if tunnelName == "" {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
_, _ = w.Write([]byte("Missing tunnelName parameter"))
|
||||
http.Error(w, "Missing tunnelName parameter", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Validate Unique
|
||||
if _, exists := s.tunnels.Get(tunnelName); exists {
|
||||
w.WriteHeader(http.StatusConflict)
|
||||
_, _ = w.Write([]byte("Tunnel already registered"))
|
||||
http.Error(w, "Tunnel already registered", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user